# coding: utf-8 # ------------------------------------------------------------------- # 宝塔Linux面板 # ------------------------------------------------------------------- # Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved. # ------------------------------------------------------------------- # Author: hwliang # ------------------------------------------------------------------- # 角色说明: # read:允许用户读取指定数据库 # readWrite:允许用户读写指定数据库 # dbAdmin:允许用户在指定数据库中执行管理函数,如索引创建、删除,查看统计或访问system.profile # userAdmin:允许用户向system.users集合写入,可以找指定数据库里创建、删除和管理用户 # clusterAdmin:只在admin数据库中可用,赋予用户所有分片和复制集相关函数的管理权限。 # readAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读权限 # readWriteAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读写权限 # userAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的userAdmin权限 # dbAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的dbAdmin权限。 # root:只在admin数据库中可用。超级账号,超级权限 # sqlite模型 # ------------------------------ import os import re import json import time import yaml from typing import Tuple,Any,Union from databaseModel.base import databaseBase import public try: import pymongo except: public.ExecShell("btpip install pymongo") public.ExecShell("rm -f /www/server/panel/pyenv/lib/python3.7/site-packages/pymongo/__pycache__/*.pyc") import pymongo try: from BTPanel import session except: pass class panelMongoDB(): DEFUALT_DB = ["admin", "config", "local"] CONFIG_PATH = os.path.join(public.get_setup_path(), "mongodb/config.conf") def __init__(self): self.check_package() self.__CONN_KWARGS = { "host": "localhost", "port": 27017, "username": None, "password": None, "socketTimeoutMS": 3000, # 套接字超时时间 "connectTimeoutMS": 3000, # 连接超时时间 "serverSelectionTimeoutMS": 3000, # 服务器选择超时时间 } self.__DB_CONN = None # 检查python包是否存在 @classmethod def check_package(cls): """ @name检测依赖是否正常 """ try: import pymongo except: public.ExecShell("btpip install pymongo") try: import pymongo except: return False return True # 连接MongoDB数据库 def connect(self) -> Tuple[bool, str]: auth = self.get_config_options("security", "authorization", "disabled") == "enabled" is_localhost = self.__CONN_KWARGS["host"] in ["localhost", "127.0.0.1"] # 本地连接自动补充 port username password if is_localhost: self.__CONN_KWARGS["port"] = self.get_config_options("net", "port", 27017) if auth: if self.__CONN_KWARGS.get("username") is None and auth: # 自动补充 username self.__CONN_KWARGS["username"] = "root" if self.__CONN_KWARGS.get("password") is None: # 自动补充 password mongodb_root_path = os.path.join(public.get_panel_path(), "data/mongo.root") if not os.path.exists(mongodb_root_path): return False, "本地登录密码为空" self.__CONN_KWARGS["password"] = public.readFile(mongodb_root_path) if not isinstance(self.__CONN_KWARGS["port"], int): self.__CONN_KWARGS["port"] = int(self.__CONN_KWARGS["port"]) err_msg = "" try: self.__DB_CONN = pymongo.MongoClient(**self.__CONN_KWARGS) self.__DB_CONN.admin.command({"listDatabases": 1}) return True, "正常" except Exception as err: err_msg = str(err) return False, err_msg # 设置连接参数 def set_host(self, *args, **kwargs): """ 设置连接参数 """ # args 兼容老版本,后续新增禁止使用 args if len(args) >= 5: kwargs["host"] = args[0] kwargs["port"] = args[1] kwargs["username"] = args[2] kwargs["password"] = args[3] if kwargs.get("db_host") is not None: kwargs["host"] = kwargs.get("db_host") if kwargs.get("db_port") is not None: kwargs["port"] = kwargs.get("db_port") if kwargs.get("db_user") is not None: kwargs["username"] = kwargs.get("db_user") if kwargs.get("db_password") is not None: kwargs["password"] = kwargs.get("db_password") self.__CONN_KWARGS.update(kwargs) if not isinstance(self.__CONN_KWARGS["port"], int): self.__CONN_KWARGS["port"] = int(self.__CONN_KWARGS["port"]) return self # 已弃用 def get_db_obj(self, db_name="admin"): if self.__DB_CONN is None: status, err_msg = self.connect() if status is False: return err_msg return self.__DB_CONN[db_name] # 新方法 def get_db_obj_new(self, db_name="admin") -> Tuple[bool, Any]: if self.__DB_CONN is None: status, err_msg = self.connect() if status is False: return status, "连接数据库[{}:{}]失败! {}".format(self.__CONN_KWARGS["db_host"], self.__CONN_KWARGS["db_port"], err_msg) return True, self.__DB_CONN[db_name] # 获取配置文件 @classmethod def get_config(cls, name: str = None, default=None) -> dict: config_data = public.readFile(cls.CONFIG_PATH) try: config = yaml.safe_load(config_data) except: config = { "systemLog": { "destination": "file", "logAppend": True, "path": "/www/server/mongodb/log/config.log" }, "storage": { "dbPath": "/www/server/mongodb/data", "directoryPerDB": True, "journal": { "enabled": True } }, "processManagement": { "fork": True, "pidFilePath": "/www/server/mongodb/log/configsvr.pid" }, "net": { "port": 27017, "bindIp": "0.0.0.0" }, "security": { "authorization": "enabled", "javascriptEnabled": False } } if name is not None: config.get(name, default) return config # 获取未注释的配置文件参数 @classmethod def get_config_options(cls, key: str, name: str, default=None): config = cls.get_config() config_info = config.get(key) if config_info is None: return default return config_info.get(name, default) # 获取配置项 @classmethod def get_options(cls, *args, **kwargs): config_info = { "port": 27017, "bind_ip": "127.0.0.1", "logpath": "", "dbpath": "", "authorization": "disabled" } if not os.path.exists(cls.CONFIG_PATH): return config_info conf = public.readFile(cls.CONFIG_PATH) for opt in config_info.keys(): tmp = re.findall(opt + ":\s+(.+)", conf) if not tmp: continue config_info[opt] = tmp[0] # public.writeFile("/www/server/1.txt",json.dumps(data)) return config_info # 重启 mongodb 服务 @classmethod def restart_localhost_services(cls): """ @重启服务 """ public.ExecShell('/etc/init.d/mongodb restart') @classmethod def set_auth_open(cls, status): """ @设置数据库密码访问开关 @状态 status:1 开启,2:关闭 """ conf = public.readFile(cls.CONFIG_PATH) if conf: if status: conf = re.sub('authorization\s*\:\s*disabled', 'authorization: enabled', conf) else: conf = re.sub('authorization\s*\:\s*enabled', 'authorization: disabled', conf) public.writeFile(cls.CONFIG_PATH, conf) cls.restart_localhost_services() return True class main(databaseBase): _DB_BACKUP_DIR = os.path.join(public.M("config").where("id=?", (1,)).getField("backup_path"), "database") _MONGODB_BACKUP_DIR = os.path.join(_DB_BACKUP_DIR, "mongodb") _MONGODBDUMP_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongodump") _MONGOEXPORT_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongoexport") _MONGORESTORE_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongorestore") _MONGOIMPORT_BIN = os.path.join(public.get_setup_path(), "mongodb/bin/mongoimport") _MONGO_ROLE_DICT = { # 数据库用户角色 "read": "读取数据(read)", "readWrite": "读取和写入数据(readWrite)", # 数据库管理角色 # "dbAdmin": "数据库管理员", "dbOwner": "数据库所有者(dbOwner)", "userAdmin": "用户管理员(userAdmin)", # 集群管理角色 # "clusterAdmin": "集群管理员", # "clusterManager": "集群管理器", # "clusterMonitor": "集群监视器", # "hostManager": "主机管理员", # 备份和恢复角色 # "backup": "备份数据", # "restore": "还原数据", # 所有数据库角色 # "readAnyDatabase": "任意数据库读取", # "readWriteAnyDatabase": "任意数据库读取和写入", # "userAdminAnyDatabase": "任意数据库用户管理员", # "dbAdminAnyDatabase": "任意数据库管理员", # 超级用户角色 # "root": "超级管理员", # 内部角色 # "__queryableBackup": "可查询备份", # "__system": "系统角色", # "enableSharding": "启用分片", } def __init__(self): if not os.path.exists(self._MONGODB_BACKUP_DIR): os.makedirs(self._MONGODB_BACKUP_DIR,exist_ok=True) def get_list(self, get): """ @获取数据库列表 @sql_type = sqlserver """ search = '' if 'search' in get: search = get['search'] conditions = '' if '_' in search: cs = '' for i in search: if i == '_': cs += '/_' else: cs += i search = cs conditions = " escape '/'" SQL = public.M('databases'); where = "lower(type) = lower('mongodb')" if search: where += "AND (name like '%{search}%' or ps like '%{search}%'{conditions})".format(search=search, conditions=conditions) if 'db_type' in get: where += " AND db_type='{}'".format(get['db_type']) if 'sid' in get: where += " AND sid='{}'".format(get['sid']) order = "id desc" if hasattr(get, 'order'): order = get.order info = {} rdata = {} info['p'] = 1 info['row'] = 20 result = '1,2,3,4,5,8' info['count'] = SQL.where(where, ()).count(); if hasattr(get, 'limit'): info['row'] = int(get.limit) if hasattr(get, 'result'): result = get.result; if hasattr(get, 'p'): info['p'] = int(get['p']) import page # 实例化分页类 page = page.Page(); info['uri'] = get info['return_js'] = '' if hasattr(get, 'tojs'): info['return_js'] = get.tojs rdata['where'] = where; # 获取分页数据 rdata['page'] = page.GetPage(info, result) # 取出数据 rdata['data'] = SQL.where(where, ()).order(order).field( 'id,sid,pid,name,username,password,accept,ps,addtime,type,db_type,conn_config').limit( str(page.SHIFT) + ',' + str(page.ROW)).select() for sdata in rdata['data']: # 清除不存在的 backup_count = 0 backup_list = public.M('backup').where("pid=? AND type=1", (sdata['id'])).select() for backup in backup_list: if not os.path.exists(backup["filename"]): public.M('backup').where("id=? AND type=1", (backup['id'])).delete() continue backup_count += 1 sdata['backup_count'] = backup_count sdata['conn_config'] = json.loads(sdata['conn_config']) return rdata; def GetCloudServer(self, get): ''' @name 获取远程服务器列表 @author hwliang<2021-01-10> @return list ''' where = '1=1' if 'type' in get: where = "db_type = '{}'".format(get['type']) data = public.M('database_servers').where(where, ()).select() if not isinstance(data, list): data = [] if get['type'] == 'mysql': bt_mysql_bin = public.get_mysql_info()['path'] + '/bin/mysql.exe' if os.path.exists(bt_mysql_bin): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 3306, 'db_user': 'root', 'db_password': '', 'ps': '本地服务器', 'addtime': 0, 'db_type': 'mysql'}) elif get['type'] == 'sqlserver': pass elif get['type'] == 'mongodb': if os.path.exists('/www/server/mongodb/bin'): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 27017, 'db_user': 'root', 'db_password': '', 'ps': '本地服务器', 'addtime': 0, 'db_type': 'mongodb'}) elif get['type'] == 'redis': if os.path.exists('/www/server/redis'): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 6379, 'db_user': 'root', 'db_password': '', 'ps': '本地服务器', 'addtime': 0, 'db_type': 'redis'}) elif get['type'] == 'pgsql': if os.path.exists('/www/server/pgsql'): data.insert(0, {'id': 0, 'db_host': '127.0.0.1', 'db_port': 5432, 'db_user': 'postgres', 'db_password': '', 'ps': '本地服务器', 'addtime': 0, 'db_type': 'pgsql'}) return data def AddCloudServer(self, get): """ @name 添加远程服务器 @author hwliang<2021-01-10> @param db_host 服务器地址 @param db_port 数据库端口 @param db_user 用户名 @param db_password 数据库密码 @param db_ps 数据库备注 @param type 数据库类型,mysql/sqlserver/sqlite @return dict """ arrs = ['db_host', 'db_port', 'db_user', 'db_password', 'db_ps', 'type'] if get.type == 'redis': arrs = ['db_host', 'db_port', 'db_password', 'db_ps', 'type'] for key in arrs: if key not in get: return public.returnMsg(False, '参数传递错误,缺少参数{}!'.format(key)) get['db_name'] = None mongodb_obj = panelMongoDB().set_host(host=get.get("db_host"), port=get.get("db_port"), username=get.get("db_user"), password=get.get("db_password")) status, err_msg = mongodb_obj.connect() if status is False: return public.returnMsg(False, "连接数据库失败!") if public.M('database_servers').where('db_host=? AND db_port=?', (get['db_host'], get['db_port'])).count(): return public.returnMsg(False, '指定服务器已存在: [{}:{}]'.format(get['db_host'], get['db_port'])) get['db_port'] = int(get['db_port']) pdata = { 'db_host': get['db_host'], 'db_port': int(get['db_port']), 'db_user': get['db_user'], 'db_password': get['db_password'], 'db_type': get['type'], 'ps': public.xssencode2(get['db_ps'].strip()), 'addtime': int(time.time()) } result = public.M("database_servers").insert(pdata) if isinstance(result, int): public.WriteLog('数据库管理', '添加远程MySQL服务器[{}:{}]'.format(get['db_host'], get['db_port'])) return public.returnMsg(True, '添加成功!') return public.returnMsg(False, '添加失败: {}'.format(result)) def RemoveCloudServer(self, get): ''' @删除远程数据库 ''' id = int(get.id) if not id: return public.returnMsg(False, '参数传递错误,请重试!') db_find = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", (id,)).find() if not db_find: return public.returnMsg(False, '指定远程服务器不存在!') public.M('databases').where('sid=?', id).delete() result = public.M('database_servers').where("id=? AND LOWER(db_type)=LOWER('mongodb')", id).delete() if isinstance(result, int): public.WriteLog('数据库管理', '删除远程MySQL服务器[{}:{}]'.format(db_find['db_host'], int(db_find['db_port']))) return public.returnMsg(True, '删除成功!') return public.returnMsg(False, '删除失败: {}'.format(result)) def ModifyCloudServer(self, get): ''' @name 修改远程服务器 @author hwliang<2021-01-10> @param id 远程服务器ID @param db_host 服务器地址 @param db_port 数据库端口 @param db_user 用户名 @param db_password 数据库密码 @param db_ps 数据库备注 @return dict ''' arrs = ['db_host', 'db_port', 'db_user', 'db_password', 'db_ps', 'type'] if get.type == 'redis': arrs = ['db_host', 'db_port', 'db_password', 'db_ps', 'type'] for key in arrs: if key not in get: return public.returnMsg(False, '参数传递错误,缺少参数{}!'.format(key)) id = int(get.id) get['db_port'] = int(get['db_port']) db_find = public.M('database_servers').where('id=?', (id,)).find() if not db_find: return public.returnMsg(False, '指定远程服务器不存在!') _modify = False if db_find['db_host'] != get['db_host'] or db_find['db_port'] != get['db_port']: _modify = True if public.M('database_servers').where('db_host=? AND db_port=?', (get['db_host'], get['db_port'])).count(): return public.returnMsg(False, '指定服务器已存在: [{}:{}]'.format(get['db_host'], get['db_port'])) if db_find['db_user'] != get['db_user'] or db_find['db_password'] != get['db_password']: _modify = True _modify = True # if _modify: # res = self.check_cloud_database(get) # if res.get("db_status", False) is False: # return res pdata = { 'db_host': get['db_host'], 'db_port': int(get['db_port']), 'db_user': get['db_user'], 'db_password': get['db_password'], 'db_type': get['type'], 'ps': public.xssencode2(get['db_ps'].strip()) } result = public.M("database_servers").where('id=?', (id,)).update(pdata) if isinstance(result, int): public.WriteLog('数据库管理', '修改远程MySQL服务器[{}:{}]'.format(get['db_host'], get['db_port'])) return public.returnMsg(True, '修改成功!') return public.returnMsg(False, '修改失败: {}'.format(result)) def set_auth_status(self, get): """ @设置密码认证状态 @status int 0:关闭,1:开启 """ if not public.process_exists("mongod"): return public.returnMsg(False, "Mongodb服务还未开启!") status = int(get.status) path = '{}/data/mongo.root'.format(public.get_panel_path()) if status: if hasattr(get, 'password'): password = get['password'].strip() if not password or not re.search("^[\w@\.]+$", password): return public.returnMsg(False, '数据库密码不能为空或带有特殊字符') if re.search('[\u4e00-\u9fa5]', password): return public.returnMsg(False, '数据库密码不能为中文,请换个名称!') else: password = public.GetRandomString(16) panelMongoDB.set_auth_open(False) status, mongodb_obj = self.get_obj_by_sid(0) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new("admin") if status is False: return public.returnMsg(False, db_obj) try: db_obj.command("dropUser", "root") except: pass db_obj.command("createUser", "root", pwd=password, roles=[ {'role': 'root', 'db': 'admin'}, {'role': 'clusterAdmin', 'db': 'admin'}, {'role': 'readAnyDatabase', 'db': 'admin'}, {'role': 'readWriteAnyDatabase', 'db': 'admin'}, {'role': 'userAdminAnyDatabase', 'db': 'admin'}, {'role': 'dbAdminAnyDatabase', 'db': 'admin'}, {'role': 'userAdmin', 'db': 'admin'}, {'role': 'dbAdmin', 'db': 'admin'} ]) panelMongoDB.set_auth_open(True) public.writeFile(path, password) else: if os.path.exists(path): os.remove(path) panelMongoDB.set_auth_open(False) return public.returnMsg(True, '操作成功.') def get_obj_by_sid(self, sid=0, conn_config=None) -> Tuple[bool, Union[str,panelMongoDB]]: """ @取mssql数据库对像 By sid @sid 数据库分类,0:本地 """ if type(sid) == str: try: sid = int(sid) except: sid = 0 if sid: if not conn_config: conn_config = public.M('database_servers').where("id=? AND LOWER(db_type)=LOWER('mongodb')", sid).find() mongodb_obj = panelMongoDB().set_host(host=conn_config["db_host"], port=conn_config["db_port"], username=conn_config["db_user"], password=conn_config["db_password"]) status, err_msg = mongodb_obj.connect() if status is False: return status, "连接数据库[{}:{}]失败".format(conn_config["db_host"], int(conn_config["db_port"])) return status, mongodb_obj else: mongodb_obj = panelMongoDB() status, err_msg = mongodb_obj.connect() if status is False: return status, "连接数据库 [localhost:27017] 失败!{}".format(err_msg) return status, mongodb_obj def AddDatabase(self, get): """ @添加数据库 """ try: sid = int(get.sid) except: return public.returnMsg(False, '数据库类型sid需要int类型!') if not int(get.sid) and not public.process_exists("mongod"): return public.returnMsg(False, "Mongodb服务还未开启!") dtype = 'MongoDB' username = '' password = '' auth_status = panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled" # auth为true时如果__DB_USER为空则将它赋值为 root,用于开启本地认证后数据库用户为空的情况 data_name = get.name.strip() if not data_name: return public.returnMsg(False, "数据库名不能为空!") if auth_status: res = self.add_base_database(get, dtype) if not res['status']: return res data_name = res['data_name'] username = res['username'] password = res['data_pwd'] else: username = data_name # 检查数据库名称是否含有非法字符 if any(char in data_name for char in '/\\. "$*<>:|?'): return public.returnMsg(False, "数据库名不能包含以下字符: /\\. \"$*<>:|?") if ' ' in data_name: return public.returnMsg(False, '数据库名称包含空格,无法正常添加!') sql = public.M('databases') if sql.where("(username=?) AND LOWER(type)=LOWER('MongoDB')", (username)).count(): return public.returnMsg(False, '数据库用户已存在,请使用其他数据库名称!') status, mongodb_obj = self.get_obj_by_sid(get.sid) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(data_name) if status is False: return public.returnMsg(False, db_obj) if not hasattr(get, 'ps'): get['ps'] = public.getMsg('INPUT_PS') addTime = time.strftime('%Y-%m-%d %X', time.localtime()) pid = 0 if hasattr(get, 'pid'): pid = get.pid if hasattr(get, 'contact'): site = public.M('sites').where("id=?", (get.contact,)).field('id,name').find() if site: pid = int(get.contact) get['ps'] = site['name'] db_type = 0 if sid: db_type = 2 db_obj.chat.insert_one({}) if auth_status: db_obj.command("createUser", username, pwd=password, roles=[{'role': 'dbOwner', 'db': data_name}, {'role': 'userAdmin', 'db': data_name}]) public.set_module_logs('linux_mongodb', 'AddDatabase', 1) # 添加入SQLITE public.M('databases').add('pid,sid,db_type,name,username,password,accept,ps,addtime,type', (pid, sid, db_type, data_name, username, password, '127.0.0.1', get['ps'], addTime, dtype)) public.WriteLog("TYPE_DATABASE", 'DATABASE_ADD_SUCCESS', (data_name,)) return public.returnMsg(True, 'ADD_SUCCESS') def DeleteDatabase(self, get): """ @删除数据库 """ id = get['id'] find = public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (id,)).field('id,pid,name,username,password,type,accept,ps,addtime,sid,db_type').find() if not find: return public.returnMsg(False, '指定数据库不存在.') try: int(find['sid']) except: return public.returnMsg(False, '数据库类型sid需要int类型!') if not public.process_exists("mongod") and not int(find['sid']): return public.returnMsg(False, "Mongodb服务还未开启!") name = get['name'] username = find['username'] if name == "admin": return public.returnMsg(False, '因Mongodb限制,禁止删除admin名称数据库!') status, mongodb_obj = self.get_obj_by_sid(find['sid']) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(name) if status is False: return public.returnMsg(False, db_obj) try: db_obj.command("dropUser", username) except: pass db_obj.command('dropDatabase') # 删除SQLITE public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (id,)).delete() public.WriteLog("TYPE_DATABASE", 'DATABASE_DEL_SUCCESS', (name,)) return public.returnMsg(True, 'DEL_SUCCESS') def get_info_by_db_id(self, db_id): """ @获取数据库连接详情 @db_id 数据库id """ find = public.M('databases').where("id=? AND LOWER(type)=LOWER('mongodb')", db_id).find() if not find: return False if find["db_type"] == 1: # 远程数据库 conn_config = json.loads(find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] elif find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] else: # 本地数据库 db_host = '127.0.0.1' db_port = panelMongoDB.get_config_options("net", "port", 27017) data = { 'db_name': find["name"], 'db_host': db_host, 'db_port': int(db_port), 'db_user': find['username'], 'db_password': find['password'], } return data # 备份数据库 def ToBackup(self, get): """ 备份数据库 """ if not os.path.exists(self._MONGODBDUMP_BIN): return public.returnMsg(False, "缺少备份工具,请先通过软件管理安装MongoDB!") if not os.path.exists(self._MONGOEXPORT_BIN): return public.returnMsg(False, "缺少备份工具,请先通过软件管理安装MongoDB!") if not hasattr(get, "id"): return public.returnMsg(False, "缺少参数!id") db_id = get.id file_type = getattr(get, "file_type", "bson") collection_list = getattr(get, "collection_list", []) field_list = getattr(get, "field_list", []) db_find = public.M("databases").where("id=? AND LOWER(type)=LOWER('mongodb')", (db_id,)).find() if not db_find: return public.returnMsg(False, "数据库不存在!{db_id}".format(db_id=db_id)) if not public.process_exists("mongod") and not int(db_find["sid"]): return public.returnMsg(False, "Mongodb服务还未开启!") if file_type not in ["bson", "json", "csv"]: return public.returnMsg(False, "当前支持 bson json csv 格式!") if file_type == "csv" and len(field_list) == 0: return public.returnMsg(False, "导出为 csv 格式时需要指定导出字段!") db_name = db_find["name"] db_host = "127.0.0.1" db_user = db_find["username"] db_password = db_find["password"] conn_data = {} if db_find["db_type"] == 0: if panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled": if not db_password: return public.returnMsg(False, "数据库密码为空!请先设置数据库密码!") else: db_password = None db_port = panelMongoDB.get_config_options("net", "port", 27017) elif db_find["db_type"] == 1: if not db_password: return public.returnMsg(False, "数据库密码为空!请先设置数据库密码!") # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] conn_data["host"] = conn_config["db_host"] conn_data["port"] = conn_config["db_port"] conn_data["username"] = conn_config["db_user"] conn_data["password"] = conn_config["db_password"] elif db_find["db_type"] == 2: if not db_password: return public.returnMsg(False, "MongoDB已经开启安全认证,数据库密码不能为空,请设置密码后再试!") conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", db_find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] conn_data["host"] = conn_config["db_host"] conn_data["port"] = conn_config["db_port"] conn_data["username"] = conn_config["db_user"] conn_data["password"] = conn_config["db_password"] else: return public.returnMsg(False, "未知的数据库类型") mongodb_obj = panelMongoDB().set_host(**conn_data) status, err_msg = mongodb_obj.connect() if status is False: return public.returnMsg(False, "连接数据库[{}:{}]失败".format(db_host, int(db_port))) # if len(collection_list) == 0: # try: # sql_obj = mongodb_obj.get_db_obj_new(db_name) # collection_list = sql_obj.list_collection_names() # except Exception as err: # return public.returnMsg(False, "连接数据库[{}:{}]失败".format(db_host, int(db_port))) db_backup_dir = os.path.join(self._MONGODB_BACKUP_DIR, db_name) if not os.path.exists(db_backup_dir): os.makedirs(db_backup_dir) file_name = "{db_name}_{file_type}_{backup_time}_mongodb_data".format(db_name=db_name, file_type=file_type, backup_time=time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())) export_dir = os.path.join(db_backup_dir, file_name) # if not os.path.isdir(export_dir): # os.makedirs(export_dir) mongodump_shell = "'{mongodump_bin}' --host='{db_host}' --port={db_port} --db='{db_name}' --out='{out}'".format( mongodump_bin=self._MONGODBDUMP_BIN, db_host=db_host, db_port=int(db_port), db_name=db_name, out=export_dir, ) mongoexport_shell = "'{mongoexport_bin}' --host='{db_host}' --port={db_port} --db='{db_name}'".format( mongoexport_bin=self._MONGOEXPORT_BIN, db_host=db_host, db_port=int(db_port), db_name=db_name, ) if db_password is not None: # 本地未开启安全认证 mongodump_shell += " --username='{db_user}' --password={db_password}".format(db_user=db_user, db_password=public.shell_quote(str(db_password))) mongoexport_shell += " --username='{db_user}' --password={db_password}".format(db_user=db_user, db_password=public.shell_quote(str(db_password))) backup_ps = "手动备份" if file_type == "bson": if len(collection_list) == 0: public.ExecShell(mongodump_shell) else: backup_ps += "-bson" for collection_name in collection_list: shell = "{mongodump_shell} --collection='{collection}'".format( mongodump_shell=mongodump_shell, collection=collection_name ) public.ExecShell(shell) else: # 导出 json csv 格式 backup_ps += "-json" fields = None if file_type == "csv": # csv fields = "--fields='{}'".format(",".join(field_list)) for collection_name in collection_list: file_path = os.path.join(export_dir, "{collection_name}.{file_type}".format(collection_name=collection_name, file_type=file_type)) shell = "{mongoexport_shell} --collection='{collection}' --type='{type}' --out='{out}'".format( mongoexport_shell=mongoexport_shell, collection=collection_name, type=file_type, out=file_path, ) if fields is not None: shell += " --fields='{fields}'".format(fields=fields) public.ExecShell(shell) if not os.path.exists(export_dir): return public.returnMsg(False, "数据库备份失败,导出文件不存在!") backup_path = "{export_dir}.zip".format(export_dir=export_dir) public.ExecShell("cd {backup_dir} && zip -m {backup_path} -r {file_name}".format(backup_dir=db_backup_dir, backup_path=backup_path, file_name=file_name)) if not os.path.exists(backup_path): public.ExecShell("rm -rf {}".format(export_dir)) return public.returnMsg(True, "备份失败!") backup_size = os.path.getsize(backup_path) public.M("backup").add("type,name,pid,filename,size,addtime,ps", (1, os.path.basename(backup_path), db_id, backup_path, backup_size, time.strftime("%Y-%m-%d %X", time.localtime()), backup_ps)) public.WriteLog("TYPE_DATABASE", "DATABASE_BACKUP_SUCCESS", (db_name,)) if backup_size < 1: return public.returnMsg(True, "备份执行成功,备份文件小于1b,请检查备份完整性.") else: return public.returnMsg(True, "BACKUP_SUCCESS") # 导入 def InputSql(self, get): if not os.path.exists(self._MONGORESTORE_BIN): return public.returnMsg(False, "缺少备份工具,请先通过软件管理安装MongoDB!") if not os.path.exists(self._MONGOIMPORT_BIN): return public.returnMsg(False, "缺少备份工具,请先通过软件管理安装MongoDB!") if not hasattr(get, "name"): return public.returnMsg(False, "缺少参数!name") if not hasattr(get, "file"): return public.returnMsg(False, "缺少参数!file") db_name = get.name file = get.file if not os.path.exists(file): return public.returnMsg(False, "导入路径不存在!") if not os.path.isfile(file): return public.returnMsg(False, "仅支持导入压缩文件!") db_find = public.M("databases").where("name=? AND LOWER(type)=LOWER('MongoDB')", (db_name,)).find() if not db_find: return public.returnMsg(False, "数据库不存在!") if not public.process_exists("mongod") and not int(db_find["sid"]): return public.returnMsg(False, "Mongodb服务还未开启!") file_name = os.path.basename(file) ext_list = ['json', 'csv', 'tar.gz', 'zip'] ext_tmp = file_name.split(".") file_ext = ".".join(ext_tmp[1:]) ext_temp = [ext.lower() for ext in ext_list if ext.lower() in file_ext] if len(ext_temp) == 0: return public.returnMsg(False, "请选择json、csv、tar.gz、zip文件格式!") input_dir = os.path.join(self._MONGODB_BACKUP_DIR, db_name, "input_tmp_{}".format(int(time.time() * 1000_000))) is_zip = False if "zip" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) public.ExecShell("unzip -o '{file}' -d {input_dir}".format(file=file, input_dir=input_dir)) is_zip = True elif "tar.gz" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) public.ExecShell("tar zxf '{file}' -C {input_dir}".format(file=file, input_dir=input_dir)) is_zip = True elif "gz" in file_ext: if not os.path.isdir(input_dir): os.makedirs(input_dir) temp_file = os.path.join(input_dir, file_name) public.ExecShell("cp '{file}' '{temp_file}' && gunzip -q '{temp_file}'".format(file=file, temp_file=temp_file)) is_zip = True input_path_list = [] if is_zip is True: def get_input_path(input_dir: str, input_path_list: list): for name in os.listdir(input_dir): path = os.path.join(input_dir, name) if os.path.isfile(path) and (path.endswith(".json") or path.endswith(".csv")): input_path_list.append(path) elif os.path.isdir(path): is_bson = False for t_name in os.listdir(path): t_path = os.path.join(path, t_name) if os.path.isfile(t_path) and t_path.endswith(".bson"): input_path_list.append(path) is_bson = True break if is_bson is False: get_input_path(path, input_path_list) get_input_path(input_dir, input_path_list) else: input_path_list.append(file) # json,csv db_name = db_find["name"] db_host = "127.0.0.1" db_user = db_find["username"] db_password = db_find["password"] if db_find["db_type"] == 0: if panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled": if not db_password: return public.returnMsg(False, "MongoDB已经开启安全认证,数据库密码不能为空,请设置密码后再试!") else: db_password = None db_port = panelMongoDB.get_config_options("net", "port", 27017) elif db_find["db_type"] == 1: # 远程数据库 conn_config = json.loads(db_find["conn_config"]) db_host = conn_config["db_host"] db_port = conn_config["db_port"] elif db_find["db_type"] == 2: conn_config = public.M("database_servers").where("id=? AND LOWER(db_type)=LOWER('mongodb')", db_find["sid"]).find() db_host = conn_config["db_host"] db_port = conn_config["db_port"] else: return public.returnMsg(False, "未知的数据库类型") status, err_msg = panelMongoDB().connect() if status is False: return public.returnMsg(False, "连接数据库[{}:{}]失败".format(db_host, int(db_port))) mongorestore_shell = "'{mongorestore}' --host='{host}' --port={port} --db='{db_name}' --drop ".format( mongorestore=self._MONGORESTORE_BIN, host=db_host, port=int(db_port), db_name=db_name, ) mongoimport_shell = "'{mongoimport}' --host='{host}' --port={port} --db='{db_name}' --drop ".format( mongoimport=self._MONGOIMPORT_BIN, host=db_host, port=int(db_port), db_name=db_name, ) if db_password is not None: # 本地未开启安全认证 mongorestore_shell += " --username='{db_user}' --password='{db_password}'".format(db_user=db_user, db_password=db_password) mongoimport_shell += " --username='{db_user}' --password='{db_password}'".format(db_user=db_user, db_password=db_password) for path in input_path_list: if os.path.isdir(path): # bson public.ExecShell("{mongorestore_shell} '{path}'".format(mongorestore_shell=mongorestore_shell, path=path)) elif os.path.isfile(path) and (path.endswith(".json") or path.endswith(".csv")): # json/csv fields = None if path.endswith(".csv"): # csv fp = open(path, "r") fields = fp.readline() fp.close() file_name = os.path.basename(path) collection = file_name.split(".")[0] file_type = file_name.split(".")[-1] shell = "{mongoimport_shell} --collection='{collection}' --file='{file}' --type='{type}'".format( mongoimport_shell=mongoimport_shell, collection=collection, file=path, type=file_type, ) if fields is not None: shell += " --fields='{fields}'".format(fields=fields) public.ExecShell(shell) # 清理导入临时目录 if is_zip is True: public.ExecShell("rm -rf {input_dir}".format(input_dir=input_dir)) public.WriteLog("TYPE_DATABASE", '导入数据库[{}]成功'.format(db_name)) return public.returnMsg(True, 'DATABASE_INPUT_SUCCESS') # 获取备份文件 def GetBackup(self, get): p = getattr(get, "p", 1) limit = getattr(get, "limit", 10) return_js = getattr(get, "return_js", "") search = getattr(get, "search", None) if not str(p).isdigit(): return public.returnMsg(False, "参数错误!p") if not str(limit).isdigit(): return public.returnMsg(False, "参数错误!limit") p = int(p) limit = int(limit) ext_list = ['json', 'csv', 'tar.gz', 'zip'] backup_list = [] # 递归获取备份文件 def get_dir_backup(backup_dir: str, backup_list: list, is_recursion: bool): for name in os.listdir(backup_dir): path = os.path.join(backup_dir, name) if os.path.isfile(path): ext = name.split(".")[-1] if ext.lower() not in ext_list: continue if search is not None and search not in name: continue stat_file = os.stat(path) path_data = { "name": name, "path": path, "size": stat_file.st_size, "mtime": int(stat_file.st_mtime), "ctime": int(stat_file.st_ctime), } backup_list.append(path_data) elif os.path.isdir(path) and is_recursion is True: get_dir_backup(path, backup_list, is_recursion) get_dir_backup(self._MONGODB_BACKUP_DIR, backup_list, True) get_dir_backup(self._DB_BACKUP_DIR, backup_list, False) try: from flask import request uri = public.url_encode(request.full_path) except: uri = '' # 包含分页类 import page # 实例化分页类 page = page.Page() info = { "p": p, "count": len(backup_list), "row": limit, "return_js": return_js, "uri": uri, } page_info = page.GetPage(info) start_idx = (int(p) - 1) * limit end_idx = p * limit if p * limit < len(backup_list) else len(backup_list) backup_list.sort(key=lambda data: data["mtime"], reverse=True) backup_list = backup_list[start_idx:end_idx] return {"status": True, "msg": "OK", "data": backup_list, "page": page_info} def DelBackup(self, get): """ @删除备份文件 """ try: name = '' id = get.id where = "id=?" filename = public.M('backup').where(where, (id,)).getField('filename') if os.path.exists(filename): os.remove(filename) if filename == 'qiniu': name = public.M('backup').where(where, (id,)).getField('name'); public.ExecShell(public.get_run_python("[PYTHON] " + public.GetConfigValue( 'setup_path') + '/panel/script/backup_qiniu.py delete_file ' + name)) public.M('backup').where(where, (id,)).delete() public.WriteLog("TYPE_DATABASE", 'DATABASE_BACKUP_DEL_SUCCESS', (name, filename)) return public.returnMsg(True, 'DEL_SUCCESS'); except Exception as err: return public.returnMsg(False, "删除备份文件失败!") # 同步数据库到服务器 def SyncToDatabases(self, get): type = int(get['type']) n = 0 sql = public.M('databases') if type == 0: data = sql.field('id,name,username,password,accept,type,sid,db_type').where("LOWER(type)=LOWER('MongoDB')", ()).select() for value in data: if value['db_type'] in ['1', 1]: continue # 跳过远程数据库 result = self.ToDataBase(value) if result == 1: n += 1 else: import json data = json.loads(get.ids) for value in data: find = sql.where("id=?", (value,)).field('id,name,username,password,sid,db_type,accept,type').find() result = self.ToDataBase(find) if result == 1: n += 1 if n == 1: return public.returnMsg(True, '同步成功') elif n == 0: return public.returnMsg(False, '同步失败') return public.returnMsg(True, 'DATABASE_SYNC_SUCCESS', (str(n),)) # 添加到服务器 def ToDataBase(self, find): if find['username'] == 'bt_default': return 0 if len(find['password']) < 3: find['username'] = find['name'] find['password'] = public.md5(str(time.time()) + find['name'])[0:10] public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (find['id'],)).save('password,username', (find['password'], find['username'])) try: sid = int(find['sid']) except: return public.returnMsg(False, '数据库类型sid需要int类型!') if not public.process_exists("mongod") and not int(find['sid']): return public.returnMsg(False, "Mongodb服务还未开启!") get = public.dict_obj() get.sid = sid auth_status = panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled" if auth_status: status, mongodb_obj = self.get_obj_by_sid(sid) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(find['name']) if status is False: return public.returnMsg(False, db_obj) try: db_obj.chat.insert_one({}) db_obj.command("dropUser", find['username']) except: pass try: db_obj.command("createUser", find['username'], pwd=find['password'], roles=[{'role': 'dbOwner', 'db': find['name']}, {'role': 'userAdmin', 'db': find['name']}]) except: pass return 1 def SyncGetDatabases(self, get): """ @从服务器获取数据库 """ n = 0 s = 0 db_type = 0 if public.process_exists("mongod") and get.sid is None: sid = 0 else: sid = get.get('sid/d', 0) if sid: db_type = 2 try: int(get.sid) except: return public.returnMsg(False, '数据库类型sid需要int类型!') if not public.process_exists("mongod") and not int(get.sid): return public.returnMsg(False, "Mongodb服务还未开启!") status, mongodb_obj = self.get_obj_by_sid(sid) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new('admin') if status is False: return public.returnMsg(False, db_obj) data = db_obj.command({"listDatabases": 1}) sql = public.M('databases') for item in data['databases']: dbname = item['name'] if sql.where("name=? AND LOWER(type)=LOWER('MongoDB')", (dbname,)).count(): continue if dbname in panelMongoDB.DEFUALT_DB: continue if sql.table('databases').add('name,username,password,accept,ps,addtime,type,sid,db_type', (dbname, dbname, "", "", public.getMsg('INPUT_PS'), time.strftime('%Y-%m-%d %X', time.localtime()), 'MongoDB', sid, db_type)): n += 1 return public.returnMsg(True, 'DATABASE_GET_SUCCESS', (str(n),)) def ResDatabasePassword(self, get): """ @修改用户密码 """ id = get['id'] username = get['name'].strip() newpassword = public.trim(get['password']) try: if not newpassword: return public.returnMsg(False, '修改失败,数据库[' + username + ']密码不能为空.') if len(re.search("^[\w@\.]+$", newpassword).groups()) > 0: return public.returnMsg(False, '数据库密码不能为空或带有特殊字符') if re.search('[\u4e00-\u9fa5]', newpassword): return public.returnMsg(False, '数据库密码不能为中文,请换个名称!') except: return public.returnMsg(False, '数据库密码不能为空或带有特殊字符') find = public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (id,)).field('id,pid,name,username,password,type,accept,ps,addtime,sid').find() if not find: return public.returnMsg(False, '修改失败,指定数据库不存在.') get = public.dict_obj() get.sid = find['sid'] try: int(find['sid']) except: return public.returnMsg(False, '数据库类型sid需要int类型!') if not public.process_exists("mongod") and not int(find['sid']): return public.returnMsg(False, "Mongodb服务还未开启!") auth_status = panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled" if auth_status: status, mongodb_obj = self.get_obj_by_sid(find['sid']) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(username) if status is False: return public.returnMsg(False, db_obj) try: db_obj.command("updateUser", username, pwd=newpassword) except: db_obj.command("createUser", username, pwd=newpassword, roles=[{'role': 'dbOwner', 'db': find['name']}, {'role': 'userAdmin', 'db': find['name']}]) else: return public.returnMsg(False, '修改失败,数据库未开启安全认证.') # 修改SQLITE public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", (id,)).setField('password', newpassword) public.WriteLog("TYPE_DATABASE", 'DATABASE_PASS_SUCCESS', (find['name'],)) return public.returnMsg(True, 'DATABASE_PASS_SUCCESS', (find['name'],)) def get_root_pwd(self, get): """ @获取root密码 """ config = panelMongoDB.get_config() config_info = { "port": config["net"].get("port", 27017), "bind_ip": config["net"].get("bindIp", "127.0.0.1"), "logpath": config["systemLog"].get("path", ""), "dbpath": config["storage"].get("dbPath", ""), "authorization": config["security"].get("authorization", "disabled") } sa_path = '{}/data/mongo.root'.format(public.get_panel_path()) if os.path.exists(sa_path): config_info['msg'] = public.readFile(sa_path) else: config_info['msg'] = '' config_info['root'] = config_info['msg'] return config_info def get_database_size_by_id(self, get): """ @获取数据库尺寸(批量删除验证) @get json/int 数据库id """ # if not public.process_exists("mongod"): # return public.returnMsg(False,"Mongodb服务还未开启!") total = 0 db_id = get if not isinstance(get, int): db_id = get['db_id'] find = public.M('databases').where("id=? AND LOWER(type)=LOWER('MongoDB')", db_id).find() try: int(find['sid']) except: return 0 if not public.process_exists("mongod") and not int(find['sid']): return 0 # try: # auth_status = panelMongoDB.get_config_options("security", "authorization", "disabled") == "enabled" # db_obj = self.get_obj_by_sid(find['sid']).get_db_obj_new(find['name']) # db_obj # db_obj.stats() # # total = tables[0][1] # if not total: total = 0 # except: # public.get_error_info() return total def check_del_data(self, args): """ @删除数据库前置检测 """ return self.check_base_del_data(args) def __new_password(self): """ 生成随机密码 """ import random import string # 生成随机密码 password = "".join(random.sample(string.ascii_letters + string.digits, 16)) return password # 数据库状态检测 def CheckDatabaseStatus(self, get): """ 数据库状态检测 """ if not hasattr(get, "sid"): return public.returnMsg(False, "缺少参数!sid") if not str(get.sid).isdigit(): return public.returnMsg(False, "参数错误!sid") sid = int(get.sid) mongodb_obj = panelMongoDB() if sid == 0: db_status, err_msg = mongodb_obj.connect() else: conn_config = public.M('database_servers').where("id=? AND LOWER(db_type)=LOWER('mongodb')", sid).find() if not conn_config: db_status = False err_msg = "远程数据库信息不存在!" else: mongodb_obj.set_host(host=conn_config.get("db_host"), port=conn_config.get("db_port"), username=conn_config.get("db_user"), password=conn_config.get("db_password")) db_status, err_msg = mongodb_obj.connect() return {"status": True, "msg": "正常" if db_status is True else "异常", "db_status": db_status, "err_msg": err_msg} def check_cloud_database_status(self, conn_config): """ @检测远程数据库是否连接 @conn_config 远程数据库配置,包含host port pwd等信息 旧方法,添加数据库时调用 """ try: mongodb_obj = panelMongoDB().set_host(host=conn_config.get("db_host"), port=conn_config.get("db_port"), username=conn_config.get("db_user"), password=conn_config.get("db_password")) status, err_msg = mongodb_obj.connect() return status except: return public.returnMsg(False, "远程数据库连接失败!") # 获取数据库集合 def GetInfo(self, get): """ 获取数据库集合 """ db_name = get.db_name db_find = public.M("databases").where("name=? AND LOWER(type)=LOWER('MongoDB')", (db_name,)).find() if not db_find: return public.returnMsg(False, "数据库不存在!") if not public.process_exists("mongod") and not int(db_find["sid"]): return public.returnMsg(False, "Mongodb服务还未开启!") status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(db_name) if status is False: return public.returnMsg(False, db_obj) result = db_obj.command("dbStats") result["collection_list"] = [] for collection_name in db_obj.list_collection_names(): collection = db_obj.command("collStats", collection_name) data = { "collection_name": collection_name, "count": collection.get("count"), # 文档数 "size": collection.get("size"), # 内存中的大小 "avg_obj_size": collection.get("avgObjSize"), # 对象平均大小 "storage_size": collection.get("storageSize"), # 存储大小 "capped": collection.get("capped"), "nindexes": collection.get("nindexes"), # 索引数 "total_index_size": collection.get("totalIndexSize"), # 索引大小 } result["collection_list"].append(data) return {"status": True, "msg": "ok", "data": result} def GetCollection(self, db_name, collection_name): """ 获取指定数据库、指定集合的集合对象 @param db_name: 数据库名 @param collection_name: 集合名 @return: 集合对象 """ db_find = public.M("databases").where( "name=? AND LOWER(type)=LOWER('MongoDB')", (db_name,) ).find() if not db_find: return False, "数据库不存在!" if not public.process_exists("mongod") and not int(db_find["sid"]): return False, "Mongodb服务还未开启!" status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return False, mongodb_obj status, db_obj = mongodb_obj.get_db_obj_new(db_name) if status is False: return False, db_obj if collection_name not in db_obj.list_collection_names(): return False, "集合不存在!" return True, db_obj[collection_name] def DeleteCollection(self, get): """ 删除整个集合 参数: - get.db_name - get.collection_name """ db_name = get.db_name collection_name = get.collection_name db_find = public.M("databases").where("name=? AND LOWER(type)=LOWER('MongoDB')", (db_name,)).find() if not db_find: return public.returnMsg(False, "数据库不存在!") status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(db_name) if status is False: return public.returnMsg(False, db_obj) if collection_name not in db_obj.list_collection_names(): return public.returnMsg(False, "集合不存在!") try: db_obj.drop_collection(collection_name) return { "status": True, "msg": f"集合 {collection_name} 删除成功" } except Exception as e: return public.returnMsg(False, f"删除集合失败: {str(e)}") def GetCollectionData(self, get): """ 获取指定数据库、指定集合的分页数据(支持筛选和字段过滤) 参数说明: get.db_name: 数据库名 get.collection_name: 集合名 get.page: 页码(默认1) get.page_size: 每页数量(默认20,最大100) get.query: JSON字符串,查询条件,例如 '{"age": {"$gt": 30}}' get.fields: 逗号分隔的字段名字符串,只返回指定字段,例如 'name,email' """ import json db_name = get.db_name collection_name = get.collection_name page = int(get.page) if hasattr(get, "page") else 1 page_size = int(get.page_size) if hasattr(get, "page_size") else 20 page_size = min(page_size, 100) # 限制最大返回100条,防止爆内存 query = {} if hasattr(get, "query") and get.query: try: if isinstance(get.query, str): query = json.loads(get.query) else: query = get.query except Exception as e: return public.returnMsg(False, f"查询条件解析错误: {str(e)}") projection = None if hasattr(get, "fields") and get.fields: # 生成 projection 字段格式 {'name': 1, 'email': 1} projection = {field.strip(): 1 for field in get.fields.split(",")} status, coll_or_msg = self.GetCollection(db_name, collection_name) if not status: return public.returnMsg(False, coll_or_msg) coll = coll_or_msg try: total_count = coll.count_documents(query) except Exception as e: return public.returnMsg(False, f"统计文档数失败: {str(e)}") from bson import json_util import json try: cursor = coll.find(query, projection).skip((page - 1) * page_size).limit(page_size) docs = list(cursor) json_data = json_util.dumps(docs) data = json.loads(json_data) except Exception as e: return public.returnMsg(False, f"获取数据失败: {str(e)}") return { "status": True, "msg": "ok", "data": { "db_name": db_name, "collection_name": collection_name, "page": page, "page_size": page_size, "total_count": total_count, "docs": data, } } def DeleteCollectionData(self, get): """ 删除指定数据库集合中的数据 参数: - get.db_name: 数据库名 - get.collection_name: 集合名 - get.delete_query: dict 或 JSON字符串,删除过滤条件(必填) 返回: - 删除匹配的文档数量 """ import json from bson import ObjectId db_name = get.db_name collection_name = get.collection_name status, coll_or_msg = self.GetCollection(db_name, collection_name) if not status: return public.returnMsg(False, coll_or_msg) coll = coll_or_msg if not hasattr(get, "delete_query") or not get.delete_query: return public.returnMsg(False, "缺少删除过滤条件(delete_query)") try: raw_query = get.delete_query if isinstance(raw_query, str): delete_query = json.loads(raw_query) elif isinstance(raw_query, dict): delete_query = raw_query else: return public.returnMsg(False, "delete_query 类型错误") except Exception as e: return public.returnMsg(False, f"删除条件解析失败: {str(e)}") if not delete_query: return public.returnMsg(False, "禁止无条件删除") if "_id" in delete_query and isinstance(delete_query["_id"], dict): if "$oid" in delete_query["_id"]: delete_query["_id"] = ObjectId(delete_query["_id"]["$oid"]) try: result = coll.delete_many(delete_query) return { "status": True, "msg": f"删除成功,删除了 {result.deleted_count} 条文档", "data": { "deleted_count": result.deleted_count } } except Exception as e: return public.returnMsg(False, f"删除失败: {str(e)}") def InsertCollectionData(self, get): """ 向指定数据库集合中新增数据 参数: - get.db_name: 数据库名 - get.collection_name: 集合名 - get.insert_data: dict / JSON字符串 / list(必填) 返回: - 新增文档的 _id 列表 """ import json from bson import ObjectId from datetime import datetime db_name = get.db_name collection_name = get.collection_name db_find = public.M("databases").where( "name=? AND LOWER(type)=LOWER('MongoDB')", (db_name,) ).find() if not db_find: return False, "数据库不存在!" status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return False, mongodb_obj status, db_obj = mongodb_obj.get_db_obj_new(db_name) if status is False: return False, db_obj coll = db_obj[collection_name] if not hasattr(get, "insert_data") or not get.insert_data: return public.returnMsg(False, "缺少插入数据(insert_data)") try: raw_data = get.insert_data if isinstance(raw_data, str): insert_data = json.loads(raw_data) elif isinstance(raw_data, (dict, list)): insert_data = raw_data else: return public.returnMsg(False, "insert_data 类型错误") except Exception as e: return public.returnMsg(False, f"插入数据解析失败: {str(e)}") if isinstance(insert_data, dict): insert_data = [insert_data] if not insert_data: return public.returnMsg(False, "插入数据不能为空") def normalize_doc(doc): for k, v in doc.items(): if isinstance(v, dict): if "$oid" in v: doc[k] = ObjectId(v["$oid"]) elif "$date" in v: # 毫秒时间戳 doc[k] = datetime.fromtimestamp(v["$date"] / 1000) else: normalize_doc(v) elif isinstance(v, list): for item in v: if isinstance(item, dict): normalize_doc(item) for doc in insert_data: normalize_doc(doc) try: if len(insert_data) == 1: result = coll.insert_one(insert_data[0]) inserted_ids = [str(result.inserted_id)] else: result = coll.insert_many(insert_data) inserted_ids = [str(_id) for _id in result.inserted_ids] return { "status": True, "msg": "插入成功", "data": { "inserted_count": len(inserted_ids), "inserted_ids": inserted_ids } } except Exception as e: return public.returnMsg(False, f"插入失败: {str(e)}") def ReplaceCollectionData(self, get): """ 完全替换指定条件匹配的单条文档(替换整个文档) 参数: - get.db_name: 数据库名 - get.collection_name: 集合名 - get.replace_data: JSON字符串,完整新文档(必须包含 _id) 返回: - 替换结果(匹配数和修改数) """ import json from bson import ObjectId from datetime import datetime db_name = get.db_name collection_name = get.collection_name status, coll_or_msg = self.GetCollection(db_name, collection_name) if not status: return public.returnMsg(False, coll_or_msg) coll = coll_or_msg if not hasattr(get, "replace_data") or not get.replace_data: return public.returnMsg(False, "缺少替换文档(replace_data)") try: if isinstance(get.replace_data, str): replace_data = json.loads(get.replace_data) else: replace_data = get.replace_data except Exception as e: return public.returnMsg(False, f"替换文档解析失败: {str(e)}") if not isinstance(replace_data, dict): return public.returnMsg(False, "替换文档必须是JSON对象") if "_id" not in replace_data: return public.returnMsg(False, "替换文档必须包含 _id 字段") _id = replace_data["_id"] if isinstance(_id, dict) and "$oid" in _id: replace_data["_id"] = ObjectId(_id["$oid"]) elif isinstance(_id, str): replace_data["_id"] = ObjectId(_id) else: pass def normalize_doc(doc): for k, v in doc.items(): if isinstance(v, dict): if "$date" in v: doc[k] = datetime.fromtimestamp(v["$date"] / 1000) else: normalize_doc(v) elif isinstance(v, list): for item in v: if isinstance(item, dict): normalize_doc(item) normalize_doc(replace_data) try: result = coll.replace_one({"_id": replace_data["_id"]}, replace_data) return { "status": True, "msg": "替换成功", "data": { "matched_count": result.matched_count, "modified_count": result.modified_count } } except Exception as e: return public.returnMsg(False, f"替换失败: {str(e)}") def GetRole(self, get): """ @获取所有角色权限 """ status, mongodb_obj = self.get_obj_by_sid(0) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new("admin") if status is False: return public.returnMsg(False, db_obj) # 获取所有角色 role_data = db_obj.command('rolesInfo', showBuiltinRoles=True) result = [] for role in role_data["roles"]: if self._MONGO_ROLE_DICT.get(role["role"]) is not None: role["name"] = self._MONGO_ROLE_DICT.get(role["role"]) result.append(role) return {"status": True, "msg": "ok", "data": result} def GetDatabaseAccess(self, get): """ @获取用户权限 @user_name: 用户名 """ user_name = get.get("user_name") if user_name is None: return public.returnMsg(False, '参数错误!缺少据库用户名称!') db_find = public.M("databases").where("username=? AND LOWER(type)=LOWER('MongoDB')", (user_name,)).find() if not db_find: return public.returnMsg(False, "数据库不存在!") if not public.process_exists("mongod") and not int(db_find["sid"]): return public.returnMsg(False, "Mongodb服务还未开启!") status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(user_name) if status is False: return public.returnMsg(False, db_obj) # 查看用户信息 user_data = db_obj.command('usersInfo', user_name) # 打印用户的权限信息 result = { "user": user_name, "db": user_name, "roles": [], } if user_data: if len(user_data["users"]) != 0: user = user_data["users"][0] result["user"] = user.get("user", user_name) result["db"] = user.get("db", user_name) result["roles"] = [info.get("role") for info in user.get("roles", []) if info.get("role")] return {"status": True, "msg": "ok", "data": result} def SetDatabaseAccess(self, get): """ @设置用户权限 @remote_ip: 远程连接地址 """ user_name = get.get("user_name", None) db_permission = get.get("db_permission", None) if user_name is None: return public.returnMsg(False, '参数错误!缺少数据库用户名!') if db_permission is None or not db_permission: return public.returnMsg(False, '请设置权限!') # if db_permission not in ["read","readWrite","dbAdmin","clusterAdmin","userAdmin","backup","restore","root"]: # return public.returnMsg(False, '数据库权限错误!') role_permission = [{"role": permission, "db": user_name} for permission in db_permission] db_find = public.M("databases").where("username=? AND LOWER(type)=LOWER('MongoDB')", (user_name,)).find() if not db_find: return public.returnMsg(False, "数据库不存在!") if not public.process_exists("mongod") and not int(db_find["sid"]): return public.returnMsg(False, "Mongodb服务还未开启!") status, mongodb_obj = self.get_obj_by_sid(db_find["sid"]) if status is False: return public.returnMsg(False, mongodb_obj) status, db_obj = mongodb_obj.get_db_obj_new(user_name) if status is False: return public.returnMsg(False, db_obj) try: user_data = db_obj.command('usersInfo', user_name) if user_data: if len(user_data["users"]) != 0: del_role_permission = [{"role": role.get("role"), "db": user_name} for role in user_data["users"][0].get("roles", [])] db_obj.command('revokeRolesFromUser', user_name, roles=del_role_permission) db_obj.command("grantRolesToUser", user_name, roles=role_permission) return public.returnMsg(True, f"{user_name} 授权成功!") except Exception as err: return public.returnMsg(False, f"授权失败!{err}")