# Easy Sqlite Toolkit # Author Zhj<2022-12-06> import typing import fcntl import os import sqlite3 import time import traceback import re import copy import weakref from functools import reduce from contextlib import contextmanager from .gcmanager import gc_enable, gc_disable from .tools import is_number from .exceptions import HintException, PanelError _BASE_DIR = '/www/server/panel' # 记录错误日志 def _log(e): if isinstance(e, HintException): return # 日志目录 log_dir = '{}/logs/sqlite_easy/{}'.format(_BASE_DIR, time.strftime('%Y%m')) # 确保目录已创建 if not os.path.exists(log_dir): os.makedirs(log_dir, 0o600) # 记录错误日志 with open('{}/{}.log'.format(log_dir, time.strftime('%d')), 'a') as fp: fp.write('[{}]{}\n'.format(time.strftime('%Y-%m-%d %X'), str(e))) # 如果是异常,打印异常堆栈信息 if isinstance(e, BaseException): fp.write('{}\n'.format(traceback.format_exc())) # 日志打印 def _log2(s, log_file='sqlite_client.log'): log_dir = '{}/logs'.format(_BASE_DIR) # 确保目录已创建 if not os.path.exists(log_dir): os.makedirs(log_dir, 0o600) with open('{}/{}'.format(log_dir, log_file), 'a') as fp: fp.write('[{}]{}\n'.format(time.strftime('%Y-%m-%d %X'), s)) def _to_tuple(p): ''' @name 将输入参数转为元组 @author Zhj<2022-07-16> @param p 输入参数 @return tuple ''' # 输入参数为元组时 直接返回 if isinstance(p, tuple): return p # 输入参数为列表时 转为元组 if isinstance(p, list): return tuple(p) # 其他情况 直接创建一个元组 并将参数放入其中 return (p,) # 正则表达式列表 # 匹配SQL字段名称的正则表达式 match_field_reg = re.compile(r'^(?:`?(\w+)`?\.)?`?(\w+)`?$', flags=re.IGNORECASE) # 匹配表名的正则表达式 match_table_name_reg = re.compile(r'^([\w`]+)(?:(?:\s+AS\s+|\s+)([\w`]+))?$', flags=re.IGNORECASE) # 匹配【?】号的正则表达式 search_question_reg = re.compile(r'\?') # 匹配查询字段名称的正则表达式 match_row_key_reg = re.compile(r'^(?:distinct\s+)?(?:[\w`]+\.)?([\w`]+)(?:(?:\s+AS\s+|\s+)([\w`]+))?$', flags=re.IGNORECASE) # 匹配查询字段别名的正则表达式 search_row_key_reg = re.compile(r'(?:\s+AS\s+|\s+)([\w`]+)$', flags=re.IGNORECASE) # 匹配SQL逻辑操作符【AND】【OR】的正则表达式 search_logic_opt_reg = re.compile(r'\s+(?:AND|OR)\s+', flags=re.IGNORECASE) # 匹配以【1 AND 】开头的WHERE语句的正则表达式 match_where_str_begin_reg = re.compile(r'^1\s+(?:AND|OR)\s+', flags=re.IGNORECASE) # 匹配引号【'】【"】的正则表达式 search_quote_reg = re.compile(r'(? @param field 字段名 @return string ''' return match_field_reg.sub(lambda m: '{}{}'.format( '' if m.group(1) is None else '`%s`.' % m.group(1), '`%s`' % m.group(2) ), str(field).strip()) def _format_cursor(cursor): ''' @name 将cursor对象转换为list对象 @author Zhj<2023-04-25> @param cursor 游标对象 @return list ''' gc_disable() cols = tuple(map(lambda x: x[0], cursor.description)) data = [] for row in cursor: d = {} i = 0 for col in cols: d[col] = row[i] i += 1 data.append(d) cursor.close() gc_enable() return data @contextmanager def _auto_repair_context(db_path): ''' @name 自动修复sqlite数据库的上下文环境 @author Zhj<2023-03-15> @param db_path sqlite数据库绝对路径 @return void ''' # os.chdir(_BASE_DIR) # sys.path.insert(0, _BASE_DIR) # # import core.include.public as public # path = os.getcwd() # monitor_backpath = path + '/backupDB/monitor_mgr_bak.db' # safety_backpath = path + '/backupDB/safety_bak.db' try: yield except BaseException as e: if str(db_path).startswith(':memory:'): raise e if str(e) in ['database disk image is malformed', 'file is not a database']: # TODO 修复数据库 # public.ExecShell('\cp -rf %s %s' % (monitor_backpath,db_path)) # public.ExecShell('\cp -rf %s %s' % (safety_backpath,db_path)) pass raise e class Where: ''' @name where条件收集类 @author Zhj<2022-07-16> ''' __slots__ = ['__WHERE_STR', '__BIND_PARAMS'] def __init__(self): self.__WHERE_STR = '1' self.__BIND_PARAMS = () # def __del__(self): # self.clear() def add(self, condition, bind_params=(), logic='AND'): ''' @name 添加where条件 @author Zhj<2022-07-16> @param condition where条件 @param bind_params 绑定参数 @param logic 逻辑运算符 AND|OR @return self ''' # 当检测condition为字段名且存在参数绑定时,自动添加"=?" if match_field_reg.match(condition) and len(_to_tuple(bind_params)) > 0: condition = '{} = ?'.format(_add_backtick_for_field(condition)) where_str = ' {} ({})' if search_logic_opt_reg.search(condition) else ' {} {}' self.__WHERE_STR += where_str.format(logic.upper(), str(condition)) self.__BIND_PARAMS += _to_tuple(bind_params) return self def add_where_in(self, field, vals, logic='AND', not_in=False): ''' @name 添加where IN查询条件 @param field 字段名 @param vals 查询条件 @param logic 逻辑运算符 AND|OR @param not_in 是否为NOT IN @return self ''' if isinstance(vals, str) or isinstance(vals, int) or isinstance(vals, float): vals = [vals] # 空列表 # (IN)构建where 0 # (NOT IN)构建where 1 if len(vals) == 0: self.__WHERE_STR += ' {} {}'.format(logic.upper(), 1 if not_in else 0) return self # 元组转列表 if isinstance(vals, tuple): vals = list(vals) # 去重 vals = list(set(vals)) # 构造where条件 tmp = [] where_params = () # 绑定参数过多时使用字符串拼接 is_to_more_vals = len(vals) > 300 if is_to_more_vals: for val in vals: if is_number(val): tmp.append(str(val)) continue tmp.append("'{}'".format(search_quote_reg.sub(r'\\\1', str(val)))) else: for val in vals: tmp.append('?') where_params += (val,) self.__WHERE_STR += ' {} {} {} ({})'.format( logic.upper(), _add_backtick_for_field(field), 'IN' if not not_in else 'NOT IN', ','.join(tmp) ) self.__BIND_PARAMS += where_params return self def build(self): ''' @name 获取where条件表达式和绑定参数 @author Zhj<2022-07-16> @return (where条件, 绑定参数) ''' where_str = match_where_str_begin_reg.sub('', self.__WHERE_STR) if len(where_str) == 0: return '', () return ' WHERE {}'.format(where_str), self.__BIND_PARAMS def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-16> @return self ''' self.__WHERE_STR = '1' self.__BIND_PARAMS = () return self class Limit: ''' @name limit条件收集类 @author Zhj<2022-07-16> ''' __slots__ = ['__LIMIT', '__SKIP'] def __init__(self): self.__LIMIT = None self.__SKIP = None # def __del__(self): # self.clear() def set_limit(self, limit): ''' @name 设置limit @author Zhj<2022-07-16> @param limit 查询行数 @return self ''' self.__LIMIT = int(limit) return self def set_skip(self, skip): ''' @name 设置skip @author Zhj<2022-07-16> @param skip 跳过的行数 @return self ''' self.__SKIP = int(skip) return self def build(self): ''' @name 获取limit条件表达式 @author Zhj<2022-17-16> @return string ''' if self.__LIMIT is None: return '' if self.__SKIP is None: return ' LIMIT {}'.format(str(self.__LIMIT)) return ' LIMIT {},{}'.format(str(self.__SKIP), str(self.__LIMIT)) def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-16> @return self ''' self.__LIMIT = None self.__SKIP = None return self class Order: ''' @name order排序条件收集类 @author ZHj<2022-07-16> ''' __slots__ = ['__ORDERS', '__BIND_PARAMS'] def __init__(self): self.__ORDERS = [] self.__BIND_PARAMS = () # def __del__(self): # self.clear() def add_order(self, field, ordering='ASC', params=()): ''' @name 添加排序条件 @author Zhj<2022-07-16> @param field 字段名 @param ordering 排序方式 ASC|DESC @param params 绑定参数 @return self ''' self.__ORDERS.append('{} {}'.format( _add_backtick_for_field(field), ordering.upper() )) self.__BIND_PARAMS += _to_tuple(params) return self def build(self): ''' @name 获取排序条件表达式和绑定参数 @author Zhj<2022-07-16> @return (排序条件表达式, 绑定参数) ''' if len(self.__ORDERS) == 0: return '', () return ' ORDER BY {}'.format(', '.join(self.__ORDERS)), self.__BIND_PARAMS def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-16> @return self ''' self.__ORDERS = [] self.__BIND_PARAMS = () return self class Field: ''' @name 查询字段收集类 @author Zhj<2022-07-16> ''' __slots__ = ['__FIELDS'] def __init__(self): self.__FIELDS = [] # def __del__(self): # self.clear() def set_fields(self, *fields): ''' @name 设置查询字段 @author Zhj<2022-07-17> @param fields 查询字段列表 @return self ''' self.__FIELDS = list( map(lambda field: _add_backtick_for_field(field), filter(lambda x: x is not None, fields) ) ) return self def add_fields(self, *fields): ''' @name 添加查询字段 @author Zhj<2022-07-16> @param fields 查询字段列表 @return self ''' self.__FIELDS += list( map(lambda field: _add_backtick_for_field(field), filter(lambda x: x is not None, fields) ) ) return self def build(self): ''' @name 获取查询字段列表 @author Zhj<2022-07-16> @return string ''' if len(self.__FIELDS) == 0: return '*' return ', '.join(list(set(self.__FIELDS))) def is_empty(self): ''' @name 检查是否为空 @author Zhj<2022-07-20> @return bool ''' return len(self.__FIELDS) == 0 def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-16> @return self ''' self.__FIELDS = [] return self class Group: ''' @name 分组条件收集类 @author Zhj<2022-07-16> ''' __slots__ = ['__GROUPS', '__BIND_PARAMS'] def __init__(self): self.__GROUPS = [] self.__BIND_PARAMS = () # def __del__(self): # self.clear() def add_group(self, condition, params=()): ''' @name 添加分组条件 @author Zhj<2022-07-16> @param condition 分组条件 @param params 绑定参数 @return self ''' self.__GROUPS.append(str(condition).strip()) self.__BIND_PARAMS += _to_tuple(params) return self def build(self): ''' @name 获取分组条件表达式和绑定参数 @author Zhj<2022-07-16> @return (分组条件表达式, 绑定参数) ''' if len(self.__GROUPS) == 0: return '', () return ' GROUP BY {}'.format(', '.join(self.__GROUPS)), self.__BIND_PARAMS def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-16> @return self ''' self.__GROUPS = [] self.__BIND_PARAMS = () return self class Having: ''' @name 分组筛选条件收集类 @author Zhj<2022-07-16> ''' __slots__ = ['__HAVINGS', '__BIND_PARAMS'] def __init__(self): self.__HAVINGS = [] self.__BIND_PARAMS = () # def __del__(self): # self.clear() def add_having(self, condition, params=()): ''' @name 添加分组筛选条件 @author Zhj<2022-07-16> @param condition 分组筛选条件 @param params 绑定参数 @return self ''' self.__HAVINGS.append(str(condition).strip()) self.__BIND_PARAMS += _to_tuple(params) return self def build(self): ''' @name 获取分组筛选条件表达式和绑定参数 @author Zhj<2022-07-16> @return (分组筛选条件表达式, 绑定参数) ''' if len(self.__HAVINGS) == 0: return '', () return ' HAVING {}'.format(', '.join(self.__HAVINGS)), self.__BIND_PARAMS def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-16> @return self ''' self.__HAVINGS = [] self.__BIND_PARAMS = () return self class Join: ''' @name 关联条件收集类 @author Zhj<2022-07-16> ''' __slots__ = ['__JOINS'] def __init__(self): self.__JOINS = [] # def __del__(self): # self.clear() def add_join(self, expression, condition, join_type='INNER', table_prefix=''): ''' @name 添加关联条件 @author Zhj<2022-07-16> @param expression 表达式 @param condition 关联条件 @param join_type 关联方式 INNER|LEFT|RIGHT @param table_prefix 表前缀 @return self ''' m = match_table_name_reg.match(expression) if m: expression = '{}{}'.format( _add_backtick_for_field('{}{}'.format(table_prefix, m.group(1).strip('`'))), '' if m.group(2) is None else ' AS {}'.format(_add_backtick_for_field(m.group(2))) ) self.__JOINS.append('{} JOIN {} ON {}'.format( join_type.upper(), str(expression).strip(), str(condition).strip() )) return self def build(self): ''' @name 获取关联条件表达式 @author Zhj<2022-07-16> @return string ''' if len(self.__JOINS) == 0: return '' return ' ' + ' '.join(self.__JOINS) def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-16> @return self ''' self.__JOINS = [] return self class Update: ''' @name Update条件 @author Zhj<2022-07-17> ''' __slots__ = ['__UPDATES', '__BIND_PARAMS'] def __init__(self): self.__UPDATES = [] self.__BIND_PARAMS = () # def __del__(self): # self.clear() def add(self, field, value): ''' @name 添加更新条件 @author Zhj<2022-07-17> @param field 字段名 @param value 值 @return self ''' self.__UPDATES.append('{} = ?'.format(_add_backtick_for_field(field))) self.__BIND_PARAMS += _to_tuple(value) return self def increment(self, field, step=1): ''' @name 自增 @author Zhj<2022-07-17> @param field 字段名 @param step 值 @return self ''' self.__UPDATES.append('{field} = {field} + {step}'.format( field=_add_backtick_for_field(field), step=str(int(step)) )) return self def decrement(self, field, step=1): ''' @name 自减 @author Zhj<2022-07-17> @param field 字段名 @param step 值 @return self ''' self.__UPDATES.append('{field} = {field} - {step}'.format( field=_add_backtick_for_field(field), step=str(int(step)) )) return self def exp(self, field, exp): ''' @name 添加原生表达式 @author Zhj<2022-12-08> @param field 字段名 @param exp 原生表达式 @return: ''' self.__UPDATES.append('{field} = {exp}'.format( field=_add_backtick_for_field(field), exp=str(exp) )) return self def build(self): ''' @name 获取更新表达式和绑定参数 @author Zhj<2022-07-17> @return self ''' if self.is_empty(): return '', () return ', '.join(self.__UPDATES), self.__BIND_PARAMS def is_empty(self): ''' @name 检查update条件是否为空 @author Zhj<2022-07-17> @return bool ''' return len(self.__UPDATES) == 0 def clear(self): ''' @name 清空收集数据 @author Zhj<2022-07-17> @return self ''' self.__UPDATES = [] self.__BIND_PARAMS = () return self class Duplicate(Update): ''' @name Duplicate条件 @author Zhj<2024-12-19> ''' def __init__(self): Update.__init__(self) def build(self): if self.is_empty(): return '', () raw_sql, binds = Update.build(self) return ' ON CONFLICT DO UPDATE SET ' + raw_sql, binds class AlterTable: __slots__ = ['__weakref__', '__QUERY', '__COLUMNS', '__RENAME_TABLE', '__ALTERS'] def __init__(self, query): if not isinstance(query, SqliteEasy): raise RuntimeError('参数query必须是一个SqliteEasy类型') self.__QUERY = weakref.proxy(query) self.__COLUMNS = [] self.__RENAME_TABLE = None self.__ALTERS = [] # def __del__(self): # self.clear() # self.__COLUMNS = [] # self.__ALTERS = [] # self.__QUERY = None def rename_table(self, new_table_name): ''' @name 更新表名 @author Zhj<2022-09-21> @param new_table_name 新表名 @return self ''' self.__RENAME_TABLE = ' RENAME TO {}'.format(_add_backtick_for_field(new_table_name)) return self def rename_column(self, col_name, new_col_name): ''' @name 更新字段名 @author Zhj<2022-09-21> @param col_name 当前字段名 @param new_col_name 新字段名 @return self ''' if self.__column_exists(col_name): self.__ALTERS.append(' RENAME COLUMN {} TO {}'.format(_add_backtick_for_field(col_name), _add_backtick_for_field(new_col_name))) self.__COLUMNS.remove(col_name) self.__COLUMNS.append(new_col_name) return self def add_column(self, col_name, prop, force=False): ''' @name 新增字段 @author Zhj<2022-09-21> @param col_name 字段名 @param prop 字段属性 @param force 是否强制新增(删除旧的字段)[可选] @return self ''' if force: self.drop_column(col_name) if not self.__column_exists(col_name): self.__ALTERS.append(' ADD COLUMN {} {}'.format(_add_backtick_for_field(col_name), prop)) self.__COLUMNS.append(col_name) return self def drop_column(self, col_name): ''' @name 删除字段 @author Zhj<2022-09-21> @param col_name 字段名 @return self ''' if self.__column_exists(col_name): self.__ALTERS.append(' DROP COLUMN {}'.format(_add_backtick_for_field(col_name))) self.__COLUMNS.remove(col_name) return self def __column_exists(self, col_name): ''' @name 检查字段是否已经存在 @author Zhj<2022-09-21> @param col_name 字段名 @return bool ''' if len(self.__COLUMNS) == 0: self.__COLUMNS = self.__QUERY.get_columns() return col_name in self.__COLUMNS def build(self, table_name): ''' @name 构建语句 @author Zhj<2022-09-21> @param table_name 表名 @return string|None ''' if self.is_empty(): return None ret = "begin;\n" ret += "\n".join(list(map(lambda x: 'ALTER TABLE {}{};'.format(table_name, x), self.__ALTERS))) ret += "ALTER TABLE {}{};\n".format(table_name, self.__RENAME_TABLE) if self.__RENAME_TABLE is not None else "\n" ret += 'commit;' return ret def is_empty(self): ''' @name 检查更新条件是否为空 @author Zhj<2022-09-21> @return bool ''' return self.__RENAME_TABLE is None and len(self.__ALTERS) == 0 def clear(self): ''' @name 清空收集数据 @author Zhj<2022-09-21> @return self ''' self.__ALTERS = [] return self class DbConnection: ''' @name Sqlite数据库连接类(相比Db类更加底层) @author Zhj<2022-12-13> ''' __slots__ = ['__DB_NAME', '__DB_PATH', '__DB_LOCK_FILE', '__CONN', '__DEBUG_LOG'] def __init__(self, db_name): ''' @name 初始化函数 @author Zhj<2022-12-14> @param db_name 数据库名称(全路径 不包含.db) @return void ''' self.__DB_NAME = db_name self.__DB_PATH = '{}.db'.format(db_name) if str(db_name).startswith(':memory:'): self.__DB_PATH = ':memory:' # 数据库连接对象 self.__CONN: typing.Optional[sqlite3.Connection] = None # 数据库并发文件锁 self.__DB_LOCK_FILE = None if self.__DB_PATH != ':memory:': self.__DB_LOCK_FILE = '/tmp/aap_locks/{}.lock'.format(self.__DB_PATH.replace('/', '____')) dirname = os.path.dirname(self.__DB_LOCK_FILE) if not os.path.exists(dirname): os.makedirs(dirname, 0o775) if not os.path.exists(self.__DB_LOCK_FILE): with open(self.__DB_LOCK_FILE, 'ab'): pass # 连接数据库 self.connect() # # 析构函数 # def __del__(self): # try: # # 关闭数据库连接 # self.close() # except BaseException as e: # _log(e) # 数据库读操作并发锁 @contextmanager def __rlock(self): if self.__DB_LOCK_FILE is not None: with open(self.__DB_LOCK_FILE, 'rb') as fp: fcntl.flock(fp.fileno(), fcntl.LOCK_SH) yield else: yield # 数据库写操作并发锁 @contextmanager def __wlock(self): if self.__DB_LOCK_FILE is not None: with open(self.__DB_LOCK_FILE, 'rb+') as fp: fcntl.flock(fp.fileno(), fcntl.LOCK_EX) yield else: yield # 获取sqlite连接对象 def conn_obj(self) -> sqlite3.Connection: ''' @name 获取sqlite数据库连接对象 @author Zhj<2022-12-22> @return sqlite3.Connection|None ''' return self.__CONN # 连接sqlite def connect(self): if isinstance(self.__CONN, sqlite3.Connection): return self.__CONN # 连接数据库(写) self.__CONN = sqlite3.connect(self.__DB_PATH, timeout=15, check_same_thread=False) self.__CONN.text_factory = str self.__CONN.isolation_level = 'IMMEDIATE' # 关闭连接 def close(self): ''' @name 关闭sqlite连接 ''' # 关闭sqlite数据库连接 if isinstance(self.__CONN, sqlite3.Connection): try: self.__CONN.close() except BaseException as e: _log(e) # 开启事务(sqlite自动开启,无需手动调用) def start_transaction(self): ''' @name 开启事务 @return bool ''' return True # 提交事务 def commit(self) -> bool: ''' @name 提交事务 @return bool ''' if isinstance(self.__CONN, sqlite3.Connection) and self.__CONN.in_transaction: self.__CONN.commit() return True return False # 回滚事务 def rollback(self) -> bool: ''' @name 回滚事务 @return bool ''' if isinstance(self.__CONN, sqlite3.Connection) and self.__CONN.in_transaction: self.__CONN.rollback() return True return False # 重试辅助函数 def __sqlite_retry_help(self, fn, *args, **kwargs): retries = 25 # 尝试最大次数 retry_interval = 20 # 每次尝试间隔时间/ms while retries > 0: try: return fn(*args, **kwargs) except ( SystemError, KeyError, sqlite3.InterfaceError, sqlite3.InternalError, sqlite3.OperationalError) as e: # 数据库操作错误,不是锁协议错误,直接抛出异常 if isinstance(e, sqlite3.OperationalError) and str(e) not in ['locking protocol', 'database is locked']: raise e # 最后一次尝试失败后直接raise异常 if retries == 1: raise e # 打印异常信息 _log(e) e = None del (e,) time.sleep(retry_interval * 0.001) finally: retries -= 1 # 查询 def query(self, sql, params=(), take_first=False): ''' @name 执行查询SQL @param sql sql语句 @param params 绑定参数[可选] @param take_first 是否只获取一行数据[可选 默认获取所有行] @return list|dict|None ''' # 读锁 with self.__rlock(): s_time = time.time() e_time = s_time try: ret = self.__sqlite_retry_help(self.__query_help, sql, params, take_first) e_time = time.time() return ret finally: if e_time - s_time > 1: _log2('{}s {}'.format(round(e_time - s_time, 2), reduce(lambda x, y: search_question_reg.sub( str(y) if is_number(y) else "'%s'" % y, x, 1), params, sql)), 'sqlite_slow.log') # 查询(辅助函数) def __query_help(self, sql, params=(), take_first=False): with _auto_repair_context(self.__DB_PATH): try: # 执行SQL # 获取游标对象 cur = self.__CONN.execute(sql, _to_tuple(params)) except: # 打印sql语句与绑定参数 _log('{}\nbindings: {}\ndb: {}'.format(sql, params, self.__DB_NAME)) cur = None del (cur,) # 抛出异常 raise try: s_time = time.time() ret = _format_cursor(cur) e_time = time.time() if e_time - s_time > 1: _log2('{}s format_cursor'.format(round(e_time - s_time, 2)), 'sqlite_slow.log') del (cur,) # 只获取首行数据 if take_first: if len(ret) == 0: return None return ret[0] return list(ret) except: # 打印sql语句与绑定参数 _log('{}\nbindings: {}\ndb: {}'.format(sql, params, self.__DB_NAME)) cur = None del (cur,) raise # 执行单条SQL语句 def execute(self, sql, params=(), get_rowid=False): ''' @name 执行SQL @param sql sql语句 @param params 绑定参数 @param get_rowid 获取插入ID @return int 影响行数或插入ID ''' # 写锁 with self.__wlock(): return self.__sqlite_retry_help(self.__execute_help, sql, params, get_rowid) # 执行单条SQL语句(辅助函数) def __execute_help(self, sql, params=(), get_rowid=False): with _auto_repair_context(self.__DB_PATH): try: # 获取游标对象 cur = self.__CONN.cursor() # 执行SQL cur.execute(sql, _to_tuple(params)) except: # 打印sql语句与绑定参数 _log('{}\nbindings: {}\ndb: {}'.format(sql, params, self.__DB_NAME)) cur = None del (cur,) # 抛出异常 raise try: # 获取新插入数据ID if get_rowid: return cur.lastrowid # 返回受影响的行数 return cur.rowcount finally: # 主动关闭Cursor cur.close() cur = None del (cur,) # 执行批量写入 def execute_many(self, sql, params=()): ''' @name 批量插入 @param sql sql语句 @param params 绑定参数 @return int 影响行数 ''' # 写锁 with self.__wlock(): return self.__sqlite_retry_help(self.__execute_many_help, sql, params) # 执行批量写入(辅助函数) def __execute_many_help(self, sql, params=()): with _auto_repair_context(self.__DB_PATH): try: # 获取游标对象 cur = self.__CONN.cursor() # 执行SQL cur.executemany(sql, params) except: # 打印sql语句与绑定参数 _log('{}\nbindings: {}\ndb: {}'.format(sql, params, self.__DB_NAME)) cur = None del (cur,) # 抛出异常 raise rowcount = cur.rowcount # 主动关闭Cursor cur.close() cur = None del (cur,) return rowcount # 执行多条SQL语句 def execute_script(self, sql): ''' @name 批量执行SQL @param sql sql语句集合 @return bool ''' # 写锁 with self.__wlock(): return self.__sqlite_retry_help(self.__execute_script_help, sql) # 执行多条SQL语句(辅助函数) def __execute_script_help(self, sql): with _auto_repair_context(self.__DB_PATH): try: # 获取游标对象 cur = self.__CONN.cursor() # 执行SQL cur.executescript(sql) except: # 打印sql语句与绑定参数 _log('{}\ndb: {}'.format(sql, self.__DB_NAME)) cur = None del (cur,) # 抛出异常 raise # 主动关闭Cursor cur.close() cur = None del (cur,) return True # 执行vacuum整理数据库空间 def vacuum(self): ''' @name 执行vaccum整理数据库空间 @author Zhj<2022-12-22> @return int ''' # 写锁 with self.__wlock(): return self.execute('vacuum') # 备份数据库 def backup(self, dest_conn) -> bool: ''' @name 将当前数据库备份到目标数据库 @author Zhj<2022-12-22> @param dest_conn 目标数据库连接对象 @return bool ''' if dest_conn.conn_obj() is None: raise PanelError('dest_conn not connect to sqlite.') # 写锁 with self.__wlock(): with _auto_repair_context(self.__DB_PATH): # 开始备份(全量备份) self.__CONN.backup(dest_conn.conn_obj()) return True # 导出数据库 def dump(self, dest_file: str, row_check_func: typing.Optional[callable] = None) -> bool: """ @name 导出数据库 @author Zhj<2024-08-08> @param dest_file 目标文件 @param row_check_func 语句行检查函数 row_check_func(row: str) -> bool @return bool """ # 写锁 with self.__wlock(): with open(dest_file, 'w') as fp: for row in self.__CONN.iterdump(): # 当传入了语句行检查函数时,执行检查函数 if row_check_func is not None and not row_check_func(row): continue # 导出数据行 fp.write(row + '\n') return True class Snapshot: __slots__ = ['pk', 'from_sub_query', 'table', 'prefix', 'alias', 'where', 'limit', 'order', 'field', 'group', 'having', 'join', 'update', 'duplicate'] class Db: ''' @name Sqlite数据库连接类 @author Zhj<2022-07-18> ''' __slots__ = ['__DB_NAME', '__DB_CONN', '__AUTO_COMMIT', '__AUTO_VACUUM', '__NEED_VACUUM', '__DEBUG_LOG', '__QUERIES'] __DB_ROOT_DIR = '{}/data/'.format(_BASE_DIR) def __init__(self, db_name): ''' @name 初始化函数 @author Zhj<2022-12-14> @param db_name 数据库名称 @param brandnew 是否开启一个全新连接 @return void ''' if str(db_name).startswith(':memory:'): self.__DB_NAME = db_name # 内存数据库 else: self.__DB_NAME = os.path.join(self.__DB_ROOT_DIR, re.sub(r'\.db$', '', db_name)) # 数据库名称(全路径 不带.db) self.__DB_CONN: typing.Optional[DbConnection] = None # 数据库连接对象 self.__AUTO_COMMIT = True # 是否自动提交事务(默认自动提交) self.__AUTO_VACUUM = False # 是否自动释放空间 self.__NEED_VACUUM = False # 事务提交后是否需要释放空间 self.__DEBUG_LOG = False # 是否开启调试日志 self.__QUERIES = [] # 查询构造器数量 self.__connect() def __del__(self): self.close() del (self.__DB_CONN,) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_trackback): self.close() def __connect(self): ''' @name 连接Sqlite数据库 @author Zhj<2022-07-16> @return self ''' self.__DB_CONN = DbConnection(self.__DB_NAME) return self def close(self): ''' @name 将数据库连接返回连接池 @author Zhj<2022-07-16> @return self ''' try: # 释放数据库连接 if self.__DB_CONN is None: return self.__DB_CONN.close() self.__DB_CONN = None except BaseException as e: _log(e) def client(self): ''' @name 获取数据库连接对象 @return SqliteClient ''' return self.__DB_CONN def db_name(self): ''' @name 获取数据库名称 @author Zhj<2022-09-07> @return string ''' return self.__DB_NAME def auto_vacuum(self, auto_vacuum=True): ''' @name 设置是否自动释放空间 @author Zhj<2022-09-27> @param auto_vacuum 是否自动释放空间 @return self ''' self.__AUTO_VACUUM = auto_vacuum return self def need_vacuum(self, need_vacuum=True): ''' @name 设置是否需要清理空间 @author Zhj<2022-09-02> @param need_vacuum 是否需要清理空间 @return self ''' self.__NEED_VACUUM = need_vacuum return self def autocommit(self, autocommit=True): ''' @name 设置自动提交事务状态 @author Zhj<2022-07-17> @param autocommit 是否自动提交事务 @return self ''' self.__AUTO_COMMIT = autocommit return self def is_autocommit(self): ''' @name 是否自动提交事务 @return bool ''' return self.__AUTO_COMMIT def is_autovacuum(self): ''' @name 是否自动释放空间 @return bool ''' return self.__AUTO_VACUUM def commit(self): ''' @name 提交事务 @author Zhj<2022-07-17> @return bool ''' if self.__DB_CONN is None: return False # 记录commit语句执行开始时间 s_time = time.time() # 提交事务 ret = self.__DB_CONN.commit() # 写日志 if self.is_debug(): self.debug_log('commit', self.db_name(), time.time() - s_time) # 开启了自动释放空间时 # 释放空间 if self.__NEED_VACUUM and self.is_autovacuum(): self.__NEED_VACUUM = False self.vacuum() return ret def rollback(self): ''' @name 回滚事务 @author Zhj<2022-07-17> @return bool ''' if self.__DB_CONN is None: return False self.__NEED_VACUUM = False # 记录rollback语句执行开始时间 s_time = time.time() ret = self.__DB_CONN.rollback() # 写日志 if self.is_debug(): self.debug_log('rollback', self.db_name(), time.time() - s_time) return ret def query(self): ''' @name 获取查询构造器对象 @author Zhj<2022-07-18> @return SqliteEasy ''' self.__QUERIES.append(None) return SqliteEasy(self) def query_done(self): ''' @name 标记查询构造器已关闭 @author Zhj<2023-02-08> @return None ''' del (self.__QUERIES[-1:],) def queries(self): ''' @name 查看当前查询构造器数量 @author Zhj<2023-02-08> @return int ''' return len(self.__QUERIES) def vacuum(self): ''' @name 释放空间 @author Zhj<2022-12-22> @return int ''' # 记录vacuum语句执行开始时间 s_time = time.time() # 执行vacuum ret = self.__DB_CONN.vacuum() # 写日志 if self.is_debug(): self.debug_log('vacuum', self.db_name(), time.time() - s_time) return ret def backup(self, dest_db): ''' @name 备份数据库(>=py3.7) @author Zhj<2022-12-22> @param dest_db 目标数据库连接对象 @return bool ''' if not isinstance(dest_db, Db): raise PanelError('dest_db must a Db object.') # 记录执行备份开始时间 s_time = time.time() # 开始备份 ret = self.__DB_CONN.backup(dest_db.client()) # 写日志 if self.is_debug(): self.debug_log('backup to {}'.format(dest_db.db_name()), self.db_name(), time.time() - s_time) return ret def dump(self, dest_file: str, row_check_func: typing.Optional[callable] = None) -> bool: ''' @name 导出数据库(>=py3.7) @author Zhj<2024-08-08> @param dest_file 目标文件 @param row_check_func 语句行检查函数 row_check_func(row: str) -> bool @return bool ''' # 记录执行导出开始时间 s_time = time.time() # 开始导出 ret = self.__DB_CONN.dump(dest_file, row_check_func) # 写日志 if self.is_debug(): self.debug_log('dump to {}'.format(dest_file), self.db_name(), time.time() - s_time) return ret def debug(self, debug_state=True): ''' @name 设置调试 @param debug_state 调试状态 @return self ''' self.__DEBUG_LOG = debug_state return self def is_debug(self): ''' @name 检查当前是否开启了调试日志 @return bool ''' return self.__DEBUG_LOG def debug_log(self, content, db_name, cost_time): ''' @name 写查询日志 @author Zhj<2022-09-27> @param content sql语句 @param db_name 数据库名称 @param cost_time 执行耗时/s @return void ''' # 获取当前日期时间 cur_datetime = time.strftime('%Y-%m-%d %X') # sql查询日志目录 log_dir = '{}/logs/sql_log/{}'.format(_BASE_DIR, time.strftime('%Y%m')) # 目录不存在时创建 if not os.path.exists(log_dir): os.makedirs(log_dir, 0o600) with open('{}/{}.log'.format(log_dir, cur_datetime[8:10]), 'a') as fp: fp.write('[{}]{} {}ms {}\n'.format(cur_datetime, db_name, round(cost_time * 1000, 2), content)) def synchronous_off(self): ''' @name 关闭同步 @author Zhj<2023-02-17> @return self ''' self.__DB_CONN.execute_script('PRAGMA synchronous=0;') return self def synchronous_off_wal(self): ''' @name 设置WAL日志模式并关闭同步 @author Zhj<2023-02-17> @return self ''' self.__DB_CONN.execute_script('PRAGMA journal_mode=wal;\nPRAGMA synchronous=0;\nPRAGMA temp_store=memory;\nPRAGMA mmap_size=30000000000;') return self def integrity_check(self): ''' @name 检查数据库是否完整 @author Zhj<2023-03-15> @return bool ''' try: ret = self.__DB_CONN.query('PRAGMA integrity_check', take_first=True) return ret['integrity_check'] == 'ok' except: return False class SqliteEasy: ''' @name Sqlite查询类 @author Zhj<2022-07-15> ''' __slots__ = ['__weakref__', '__DB', '__DB_TABLE', '__FETCH_SQL', '__EXPLAIN', '__PK', '__OPT_PREFIX', '__OPT_ALIAS', '__OPT_WHERE', '__OPT_LIMIT', '__OPT_ORDER', '__OPT_FIELD', '__OPT_GROUP', '__OPT_HAVING', '__OPT_JOIN', '__OPT_UPDATE', '__OPT_DUPLICATE', '__OPT_ALTER_TABLE', '__FROM_SUB_QUERY', '__CONFLICT_OPTIONS'] def __init__(self, db: typing.Optional[Db] = None): self.__DB = None # 数据库对象 self.__DB_TABLE = None # 表名(包含前缀) self.__FETCH_SQL = False # 是否输出sql语句 self.__EXPLAIN = False # 分析sql语句 self.__PK = None # 主键字段名 self.__OPT_PREFIX = 'bt_' # 表前缀 self.__OPT_ALIAS = None # 表别名 self.__OPT_WHERE = Where() # where条件 self.__OPT_LIMIT = Limit() # limit条件 self.__OPT_ORDER = Order() # order条件 self.__OPT_FIELD = Field() # field条件 self.__OPT_GROUP = Group() # group条件 self.__OPT_HAVING = Having() # having条件 self.__OPT_JOIN = Join() # 联表条件 self.__OPT_UPDATE = Update() # update条件 self.__OPT_DUPLICATE = Duplicate() # Duplicate self.__OPT_ALTER_TABLE = AlterTable(self) # 更新表结构条件 self.__FROM_SUB_QUERY = False # 是否通过子查询 self.__CONFLICT_OPTIONS = ( 'ROLLBACK', # 回滚 'ABORT', # 撤销 'FAIL', # 抛出失败异常 'IGNORE', # 忽略 'REPLACE', # 覆盖 ) # 发生约束冲突时,额外的处理选项 if db is not None: self.__DB = db def __del__(self): self.close() self.__OPT_WHERE = None self.__OPT_LIMIT = None self.__OPT_ORDER = None self.__OPT_FIELD = None self.__OPT_GROUP = None self.__OPT_HAVING = None self.__OPT_JOIN = None self.__OPT_UPDATE = None self.__OPT_ALTER_TABLE = None del (self.__OPT_WHERE, self.__OPT_LIMIT, self.__OPT_ORDER, self.__OPT_FIELD, self.__OPT_GROUP, self.__OPT_HAVING, self.__OPT_JOIN, self.__OPT_UPDATE, self.__OPT_ALTER_TABLE,) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_traceback): self.close() def set_db(self, db): ''' @name 设置数据库对象 @author Zhj<2022-07-18> @param db 数据库对象 @return self ''' if not isinstance(db, Db): raise PanelError('db must a instance of core.include.sqlite_server.Db') self.__DB = db return self def get_db(self): ''' @name 获取数据库对象 @author Zhj<2022-07-18> @return Db|None ''' return self.__DB def set_pk(self, pk): ''' @name 设置主键字段名 @author Zhj<2022-07-19> @param pk 主键字段名 @return self ''' self.__PK = pk return self def set_where_obj(self, where_obj): ''' @name 设置Where对象 @author Zhj<2022-07-19> @param where_obj Where对象 @return self ''' if not isinstance(where_obj, Where): raise PanelError('where_obj must a instance of core.include.sqlite_server.Where') self.__OPT_WHERE = where_obj return self def set_limit_obj(self, limit_obj): ''' @name 设置Limit对象 @author Zhj<2022-07-19> @param limit_obj Limit对象 @return self ''' if not isinstance(limit_obj, Limit): raise PanelError('limit_obj must a instance of core.include.sqlite_server.Limit') self.__OPT_LIMIT = limit_obj return self def set_order_obj(self, order_obj): ''' @name 设置Order对象 @author Zhj<2022-07-19> @param order_obj Order对象 @return self ''' if not isinstance(order_obj, Order): raise PanelError('order_obj must a instance of core.include.sqlite_server.Order') self.__OPT_ORDER = order_obj return self def set_field_obj(self, field_obj): ''' @name 设置Field对象 @author Zhj<2022-07-19> @param field_obj Field对象 @return self ''' if not isinstance(field_obj, Field): raise PanelError('field_obj must a instance of core.include.sqlite_server.Field') self.__OPT_FIELD = field_obj return self def set_group_obj(self, group_obj): ''' @name 设置Group对象 @author Zhj<2022-07-19> @param group_obj Group对象 @return self ''' if not isinstance(group_obj, Group): raise PanelError('group_obj must a instance of core.include.sqlite_server.Group') self.__OPT_GROUP = group_obj return self def set_having_obj(self, having_obj): ''' @name 设置Having对象 @author Zhj<2022-07-19> @param having_obj Having对象 @return self ''' if not isinstance(having_obj, Having): raise PanelError('having_obj must a instance of core.include.sqlite_server.Having') self.__OPT_HAVING = having_obj return self def set_join_obj(self, join_obj): ''' @name 设置Join对象 @author Zhj<2022-07-19> @param join_obj Join对象 @return self ''' if not isinstance(join_obj, Join): raise PanelError('join_obj must a instance of core.include.sqlite_server.Join') self.__OPT_JOIN = join_obj return self def set_update_obj(self, update_obj): ''' @name 设置Update对象 @author Zhj<2022-07-19> @param update_obj Update对象 @return self ''' if not isinstance(update_obj, Update): raise PanelError('update_obj must a instance of core.include.sqlite_server.Update') self.__OPT_UPDATE = update_obj return self def set_duplicate_obj(self, duplicate_obj): ''' @name 设置Duplicated对象 @author Zhj<2022-07-19> @param duplicate_obj Duplicate对象 @return self ''' if not isinstance(duplicate_obj, Duplicate): raise PanelError('duplicate_obj must a instance of core.include.sqlite_server.Duplicate') self.__OPT_DUPLICATE = duplicate_obj return self def set_alter_table_obj(self, alter_table_obj): ''' @name 设置AlterTable对象 @author Zhj<2022-07-19> @param alter_table_obj AlterTable对象 @return self ''' if not isinstance(alter_table_obj, AlterTable): raise PanelError('alter_table_obj must a instance of core.include.sqlite_server.AlterTable') self.__OPT_ALTER_TABLE = alter_table_obj return self def close(self): ''' @name 关闭游标并将数据库连接返回连接池 @author Zhj<2022-07-16> @return self ''' try: self.__DB.query_done() self.__DB = None # 清理查询条件 self.__clear() except: pass def autocommit(self, autocommit=True): ''' @name 设置自动提交事务状态 @author Zhj<2022-07-17> @param autocommit 是否自动提交事务 @return self ''' if self.__DB is not None: self.__DB.autocommit(autocommit) return self def commit(self): ''' @name 提交事务 @author Zhj<2022-07-17> @return bool ''' if self.__DB is None: return False # 提交事务 return self.__DB.commit() def rollback(self): ''' @name 回滚事务 @author Zhj<2022-07-17> @return bool ''' if self.__DB is None: return False # 回滚事务 return self.__DB.rollback() def vacuum(self): """ @name 释放空间 @author Zhj<2023-11-13> @return bool """ if self.__DB is None: return False return self.__DB.vacuum() def fetch_sql(self, fetch_sql=True): ''' @name 设置是否输入sql原生语句 @author Zhj<2022-07-18> @param fetch_sql 是否输入sql原生语句 @return self ''' self.__FETCH_SQL = fetch_sql return self def explain(self, explain=True): ''' @name 设置EXPLAIN @author Zhj<2022-07-20> @param explain 是否开启EXPLAIN @return self ''' self.__EXPLAIN = explain return self def prefix(self, prefix): ''' @name 设置表前缀 @author Zhj<2022-07-16> @param prefix 表前缀 @return self ''' self.__OPT_PREFIX = prefix return self def name(self, table_name): ''' @name 设置表名(不包含前缀) @author Zhj<2022-07-16> @param table_name 表名(不包含前缀) @return self ''' return self.table(table_name, False) def table(self, table_name, is_fullname=True): ''' @name 设置表名(包含前缀) @param table_name 表名 @param is_fullname 是否完整表名(不包含前缀) @return self ''' m = match_table_name_reg.match(table_name) if m is None or m.group(1) is None: raise PanelError('Invalid table name:{}'.format(table_name)) self.__DB_TABLE = m.group(1).strip('`') # 添加前缀 if not is_fullname and self.__OPT_PREFIX is not None: self.__DB_TABLE = self.__OPT_PREFIX + self.__DB_TABLE if m.group(2) is not None: self.__OPT_ALIAS = m.group(2).strip('`') # 重置主键字段名 self.__PK = None return self # 使用子查询 def from_sub_query(self, sub_query): # 当传入查询构造器对象时,将其转为SQL语句 if isinstance(sub_query, SqliteEasy): sub_query = sub_query.build_sql(True) m = search_subquery_reg.search(str(sub_query)) if m is None or m.group(1) is None: raise PanelError('Invalid sub-query:{}'.format(sub_query)) self.__DB_TABLE = m.group(1) if m.group(2) is not None: self.__OPT_ALIAS = m.group(2).strip('`') # 重置主键字段名 self.__PK = None self.__FROM_SUB_QUERY = True return self def alias(self, alias): ''' @name 设置表别名 @author Zhj<2022-07-16> @param alias 表别名 @return self ''' self.__OPT_ALIAS = alias return self def field(self, *fields): ''' @name 添加查询字段 @author Zhj<2022-07-15> @param fields 查询字段列表 @return self ''' self.__OPT_FIELD.add_fields(*fields) return self def join(self, exp, condition, join_type='INNER', add_table_prefix=True): ''' @name 添加关联条件 @author Zhj<2022-07-17> @param exp 表达式 @param condition 关联条件 @param join_type 关联方式 INNER|LEFT|RIGHT @param add_table_prefix 是否添加表前缀[可选 默认自动添加] @return self ''' table_prefix = '' if add_table_prefix and self.__OPT_PREFIX is not None: table_prefix = self.__OPT_PREFIX self.__OPT_JOIN.add_join(exp, condition, join_type, table_prefix) return self def inner_join(self, exp, condition, add_table_prefix=True): ''' @name 添加内连接关联条件 @param exp 表达式 @param condition 关联条件 @param add_table_prefix 是否添加表前缀[可选 默认自动添加] @return self ''' return self.join(exp, condition, 'INNER', add_table_prefix) def left_join(self, exp, condition, add_table_prefix=True): ''' @name 添加左连接关联条件 @author Zhj<2022-07-17> @param exp 表达式 @param condition 关联条件 @param add_table_prefix 是否添加表前缀[可选 默认自动添加] @return self ''' return self.join(exp, condition, 'LEFT', add_table_prefix) def right_join(self, exp, condition, add_table_prefix=True): ''' @name 添加右连接关联条件 @author Zhj<2022-07-17> @param exp 表达式 @param condition 关联条件 @param add_table_prefix 是否添加表前缀[可选 默认自动添加] @return self ''' return self.join(exp, condition, 'RIGHT', add_table_prefix) def where(self, condition, params=()): ''' @name 添加where条件 AND @author Zhj<2022-07-16> @param condition where条件 @param params 绑定参数 @return self ''' self.__OPT_WHERE.add(condition, params) return self def where_or(self, condition, params=()): ''' @name 添加where条件 OR @author Zhj<2022-07-16> @param condition where条件 @param params 绑定参数 @return self ''' self.__OPT_WHERE.add(condition, params, 'OR') return self def where_in(self, field, vals, logic='AND'): ''' @name 添加where条件 IN @author Zhj<2022-07-16> @param field 字段名 @param vals 查询参数列表 @param logic 逻辑运算符 AND|OR @return self ''' self.__OPT_WHERE.add_where_in(field, vals, logic) return self def where_not_in(self, field, vals, logic='AND'): ''' @name 添加where条件 IN @author Zhj<2022-07-16> @param field 字段名 @param vals 查询参数列表 @param logic 逻辑运算符 AND|OR @return self ''' self.__OPT_WHERE.add_where_in(field, vals, logic, True) return self # TODO 嵌套where @contextmanager def where_nest(self): yield self def group(self, condition, params=()): ''' @name 添加分组条件 @author Zhj<2022-07-17> @param condition 分组条件 @param params 绑定参数 @return self ''' self.__OPT_GROUP.add_group(condition, params) return self def having(self, condition, params=()): ''' @name 添加分组筛选条件 @author Zhj<2022-07-17> @param condition 分组筛选条件 @param params 绑定参数 @return self ''' self.__OPT_HAVING.add_having(condition, params) return self def order(self, field, ordering='ASC', params=()): ''' @name 添加排序条件 @author Zhj<2022-07-17> @param field 字段名 @param ordering 排序方式 ASC|DESC @param params 绑定参数 @return self ''' self.__OPT_ORDER.add_order(field, ordering, params) return self def limit(self, limit, skip=None): ''' @name 设置返回行数 @author Zhj<2022-07-16> @param limit 返回的行数 @param skip 跳过的行数 @return self ''' self.__OPT_LIMIT.set_limit(limit) if skip is not None: self.__OPT_LIMIT.set_skip(skip) return self def skip(self, skip): ''' @name 设置跳过的行数 @author Zhj<2022-07-16> @param skip 跳过的行数 @return self ''' self.__OPT_LIMIT.set_skip(skip) return self def query(self, raw_sql, params=(), take_first=False, clear_conditions=True): ''' @name 查询 @author Zhj<2022-07-17> @param raw_sql sql语句 @param params 绑定参数[可选] @param take_first 是否只获取一行数据[可选 默认获取所有行] @param clear_conditions 是否清空查询条件[可选 默认清空] @return list|dict|None ''' if self.__DB is None: return None # 记录语句执行开始时间 s_time = time.time() # 执行sql语句 ret = self.__DB.client().query(raw_sql, _to_tuple(params), take_first) # 写日志 if self.__DB.is_debug(): self.__DB.debug_log(self.__to_raw_sql(raw_sql, _to_tuple(params)), self.__DB.db_name(), time.time() - s_time) # 自动提交事务 # if self.__is_autocommit(): # self.commit() # 清空查询条件 if clear_conditions: self.__clear() return ret def execute(self, raw_sql, params=(), get_rowid=False, clear_conditions=True): ''' @name 执行一条sql语句并返回影响的行数 @author Zhj<2022-07-18> @param raw_sql sql语句 @param params 绑定参数[可选] @param get_rowid 获取插入ID[可选 默认返回影响的行数] @param clear_conditions 是否清空查询条件[可选 默认清空] @return integer ''' if self.__DB is None: return 0 # 记录语句执行开始时间 s_time = time.time() # 执行sql语句 ret = self.__DB.client().execute(raw_sql, _to_tuple(params), get_rowid) # 写日志 if self.__DB.is_debug(): self.__DB.debug_log(self.__to_raw_sql(raw_sql, _to_tuple(params)), self.__DB.db_name(), time.time() - s_time) # 自动提交事务 if self.__is_autocommit(): self.commit() # 清空查询条件 if clear_conditions: self.__clear() return ret def execute_script(self, sql_script, clear_conditions=True): ''' @name 执行多条sql语句(注意:执行这个方法会自动提交之前的事务,本次执行不会加入事务,事务请编写在sql脚本中) @author Zhj<2022-07-18> @param sql_script sql语句 @param clear_conditions 是否清空查询条件[可选 默认清空] @return bool ''' if self.__DB is None: return False # 记录语句执行开始时间 s_time = time.time() # 执行sql语句 self.__DB.client().execute_script(sql_script) # 写日志 if self.__DB.is_debug(): self.__DB.debug_log(sql_script, self.__DB.db_name(), time.time() - s_time) # 清空查询条件 if clear_conditions: self.__clear() return True def insert(self, data, option=None, get_rowid=True): ''' @name 插入一条数据 @author Zhj<2022-07-17> @param data 插入数据 @param option 额外选项[可选] @param get_rowid 获取插入ID[可选 默认返回影响的行数] @return integer|string ''' if self.__DB_TABLE is None: raise PanelError('Insert failed: table name not provide.') # 传入dict类型之外的情况 if not isinstance(data, dict): # list类型:尝试insert_all if isinstance(data, list): return self.insert_all(data, option=option) # 其它类型:抛出异常提示 raise PanelError('Insert failed: parameter "data" type must be dict.') # 检查冲突选项是否正确 if option is not None and str(option).upper() not in self.__CONFLICT_OPTIONS: raise PanelError('option must be one of: {}'.format(', '.join(self.__CONFLICT_OPTIONS))) placeholders = [] ks = data.keys() params = () for k in ks: placeholders.append('?') params += (data[k],) # build deplicate sql and bind params duplicate_sql, duplicate_binds = self.__OPT_DUPLICATE.build() params += duplicate_binds raw_sql = 'INSERT{} INTO {} ({}) VALUES ({}){}'.format( ' OR {}'.format(str(option).upper()) if option is not None else '', _add_backtick_for_field(self.__DB_TABLE), ', '.join(list(map(lambda x: _add_backtick_for_field(x), ks))), ','.join(placeholders), duplicate_sql ) # 输出sql原生语句 if self.__FETCH_SQL: return self.__to_raw_sql(raw_sql, params) # 输出SQL语句分析信息 if self.__EXPLAIN: return self.explain_raw_sql(raw_sql, params) # 执行sql语句 return self.execute(raw_sql, params, get_rowid) def insert_all(self, data_list, clear_conditions=True, option=None): ''' @name 批量插入数据 @author Zhj<2022-07-17> @param data_list 批量插入数据 @param clear_conditions 是否清空查询条件[可选 默认清空] @param option 额外选项[可选] @return integer ''' if self.__DB is None or len(data_list) == 0: return 0 # 检查冲突选项是否正确 if option is not None and str(option).upper() not in self.__CONFLICT_OPTIONS: raise PanelError('option must be one of: {}'.format(', '.join(self.__CONFLICT_OPTIONS))) ks = data_list[0].keys() # build deplicate sql and bind params duplicate_sql, duplicate_binds = self.__OPT_DUPLICATE.build() # 生成sql语句 raw_sql = 'INSERT{} INTO {} ({}) VALUES ({}){}'.format( ' OR {}'.format(str(option).upper()) if option is not None else '', _add_backtick_for_field(self.__DB_TABLE), ', '.join(list(map(lambda x: _add_backtick_for_field(x), ks))), ','.join(list(map(lambda x: '?', ks))), duplicate_sql ) # 绑定参数 params = list(map(lambda x: _to_tuple(list(map(lambda y: x[y], ks))), data_list)) params += duplicate_binds # 记录语句执行开始时间 s_time = time.time() # 执行sql语句 ret = self.__DB.client().execute_many(raw_sql, params) # 写日志 if self.__DB.is_debug(): # 构造一个真实的批量写入sql debug_raw_sql = raw_sql i = len(params) - 1 pad_str = ', ({})'.format(','.join(list(map(lambda x: '?', ks)))) while i: debug_raw_sql += pad_str i -= 1 # 将绑定参数扁平化 debug_params = () for p in params: debug_params += p self.__DB.debug_log(self.__to_raw_sql(debug_raw_sql, _to_tuple(debug_params)), self.__DB.db_name(), time.time() - s_time) # 自动提交事务 if self.__is_autocommit(): self.commit() # 清空查询条件 if clear_conditions: self.__clear() # 返回新增的行数 return ret def increment(self, field, step=1): ''' @name 自增数值 @author Zhj<2022-07-17> @param field 字段名 @param step 值 @return self ''' self.__OPT_UPDATE.increment(field, step) return self def decrement(self, field, step=1): ''' @name 自减数值 @author Zhj<2022-07-17> @param field 字段名 @param step 值 @return self ''' self.__OPT_UPDATE.decrement(field, step) return self def exp(self, field, exp): ''' @name 使用原生表达式更新 @param field 字段名 @param exp 原生表达式 @return self ''' self.__OPT_UPDATE.exp(field, exp) return self def update(self, data=None): ''' @name 更新表数据 @author Zhj<2022-07-17> @param data 更新数据 @return integer|string ''' if data is not None: # 更新数据不是字典类型 返回0 if not isinstance(data, dict): return 0 for k, v in data.items(): self.__OPT_UPDATE.add(k, v) # 更新条件为空 返回0 if self.__OPT_UPDATE.is_empty(): return 0 update_str, update_params = self.__OPT_UPDATE.build() join_str = self.__OPT_JOIN.build() where_str, where_params = self.__OPT_WHERE.build() limit_str = self.__OPT_LIMIT.build() raw_sql = 'UPDATE {table_name}{join_condition} SET {exprission}' \ '{where_condition}{limit_condition}'.format_map({ 'table_name': self.__build_table_name(True), 'join_condition': join_str, 'exprission': update_str, 'where_condition': where_str, 'limit_condition': limit_str, }) bind_params = update_params + where_params # 输出原生sql语句 if self.__FETCH_SQL: return self.__to_raw_sql(raw_sql, bind_params) # 输出SQL语句分析信息 if self.__EXPLAIN: return self.explain_raw_sql(raw_sql, bind_params) # 执行sql语句 return self.execute(raw_sql, bind_params) def delete(self): ''' @name 删除表数据 @author Zhj<2022-07-17> @return integer|string ''' join_str = self.__OPT_JOIN.build() where_str, where_params = self.__OPT_WHERE.build() limit_str = self.__OPT_LIMIT.build() raw_sql = 'DELETE FROM {table_name}{join_condition}{where_condition}{limit_condition}'.format( table_name=self.__build_table_name(True), join_condition=join_str, where_condition=where_str, limit_condition=limit_str ) # 输出原生sql语句 if self.__FETCH_SQL: return self.__to_raw_sql(raw_sql, where_params) # 输出SQL语句分析信息 if self.__EXPLAIN: return self.explain_raw_sql(raw_sql, where_params) # 执行sql语句 ret = self.execute(raw_sql, where_params) # 自动提交事务时 # 且开启了自动释放空间时 # 释放空间 if self.__is_autocommit() and self.__is_autovacuum(): self.__DB.vacuum() # 否则 # 记录本次事务提交后需要释放空间 else: self.__DB.need_vacuum() return ret def find(self): ''' @name 查询一行数据 @author Zhj<2022-07-17> @return dict|None|string ''' self.__OPT_LIMIT.set_limit(1) # 构建sql语句和绑定参数 raw_sql, bind_params = self.__build_sql() # 输出原生sql语句 if self.__FETCH_SQL: return self.__to_raw_sql(raw_sql, bind_params) # 输出SQL语句分析信息 if self.__EXPLAIN: return self.explain_raw_sql(raw_sql, bind_params) return self.query(raw_sql, bind_params, True) def select(self): ''' @name 查询多行数据 @author Zhj<2022-07-17> @return list|None|string ''' # 构建sql语句和绑定参数 raw_sql, bind_params = self.__build_sql() # 输出原生sql语句 if self.__FETCH_SQL: return self.__to_raw_sql(raw_sql, bind_params) # 输出SQL语句分析信息 if self.__EXPLAIN: return self.explain_raw_sql(raw_sql, bind_params) return self.query(raw_sql, bind_params) def value(self, field): ''' @name 获取某个字段的值 @author Zhj<2022-07-17> @param field 字段名 @return string|integer|None ''' self.__OPT_FIELD.set_fields(field) ret = self.find() # 输出原生sql语句或SQL语句分析信息 if self.__FETCH_SQL or self.__EXPLAIN: return ret if ret is None: return None return ret.get(self.__get_row_key(field), None) def column(self, field, dict_key=None): ''' @name 获取指定字段的值列表或字典 @author Zhj<2022-07-17> @param field 字段名 @param dict_key 字典键(字段名) @return list|dict|string ''' if self.__OPT_FIELD.is_empty() and field is not None: self.__OPT_FIELD.set_fields(*filter(lambda x: x is not None, [field, dict_key])) ret = self.select() # 输出原生sql语句或SQL语句分析信息 if self.__FETCH_SQL or self.__EXPLAIN: return ret if ret is None: # 指定了键时 返回字典 if dict_key is not None: return {} return [] # 返回列表 if dict_key is None: return list(map(lambda x: x.get(self.__get_row_key(field), None), ret)) # 返回字典 d = {} f_k = None if field is None else self.__get_row_key(field) d_k = self.__get_row_key(dict_key) for item in ret: k = item.get(d_k, None) if k is None: continue d[k] = item if f_k is None else item.get(f_k, None) return d def count(self): ''' @name 统计行数 @author Zhj<2022-07-17> @return integer ''' ret = self.value('COUNT(*)') # 输出原生sql语句或SQL语句分析信息 if self.__FETCH_SQL or self.__EXPLAIN: return ret if ret is None: ret = 0 return int(ret) def avg(self, field, precsicion=None): ''' @name 统计平均值 @author Zhj<2022-07-17> @param field 字段名 @param precision 小数点精度 @return float ''' field = 'AVG({})'.format(_add_backtick_for_field(field)) if precsicion is not None and is_number(precsicion): field = 'ROUND({},{})'.format(field, precsicion) ret = self.value(field) # 输出原生sql语句或SQL语句分析信息 if self.__FETCH_SQL or self.__EXPLAIN: return ret if ret is None: ret = 0 return float(ret) def sum(self, field, precsicion=None): ''' @name 统计总和 @author Zhj<2022-07-17> @param field 字段名 @param precision 小数点精度 @return integer ''' field = 'SUM({})'.format(_add_backtick_for_field(field)) if precsicion is not None and is_number(precsicion): field = 'ROUND({},{})'.format(field, precsicion) ret = self.value(field) # 输出原生sql语句或SQL语句分析信息 if self.__FETCH_SQL or self.__EXPLAIN: return ret if ret is None: ret = 0 return int(ret) def exists(self): ''' @name 检查数据是否存在 @author Zhj<2022-07-18> @return bool ''' self.__OPT_LIMIT.set_limit(1) # 构建sql语句和绑定参数 raw_sql, bind_params = self.__build_sql() k = 'bt__exists' raw_sql = 'SELECT EXISTS({}) AS `{}`'.format(raw_sql, k) # 输出原生sql语句 if self.__FETCH_SQL: return self.__to_raw_sql(raw_sql, bind_params) # 输出SQL语句分析信息 if self.__EXPLAIN: return self.explain_raw_sql(raw_sql, bind_params) # 执行sql语句 ret = self.query(raw_sql, bind_params, True) if ret is None or not isinstance(ret, dict): return False return True if int(ret.get(k, 0)) == 1 else False # 设置insert时唯一索引重复时的更新操作 def duplicate(self, update: typing.Dict[str, str]): self.__OPT_DUPLICATE.clear() for k, v in update.items(): self.__OPT_DUPLICATE.exp(k, v) return self # TODO 强制索引 def force_index(self, index_name: str): pass def build_sql(self, sub_query=False, alias=None): ''' @name 构建sql查询语句(合并绑定参数) @author Zhj<2022-07-17> @param sub_query 是否为子查询 @return string ''' raw_sql = self.__to_raw_sql(*self.__build_sql()) # 子查询 if sub_query: raw_sql = '(%s)' % raw_sql # 别名 if alias is not None: raw_sql = '%s AS %s' % (raw_sql, _add_backtick_for_field(alias)) return raw_sql def get_pk(self): ''' @name 获取主键字段名 @author Zhj<2022-07-18> @return string|None ''' if self.__PK is not None: return self.__PK ret = self.query('PRAGMA TABLE_INFO({})'.format(self.__build_table_name(False)), take_first=True, clear_conditions=False) if ret is None: return None self.__PK = ret.get('name', None) return self.__PK def get_columns(self): ''' @name 获取所有列名 @author Zhj<2022-09-21> @return list ''' ret = self.query('PRAGMA TABLE_INFO({})'.format(self.__build_table_name(False)), clear_conditions=False) return [column['name'] for column in ret] def add_column(self, col_name, prop, force=False): ''' @name 新建字段 @author Zhj<2022-09-21> @param col_name 字段名 @param prop 字段属性 @param force 是否强制新增(删除旧的字段)[可选] @return self ''' self.__OPT_ALTER_TABLE.add_column(col_name, prop, force) return self def drop_column(self, col_name): ''' @name 删除字段 @author Zhj<2022-09-21> @param col_name 字段名 @return self ''' self.__OPT_ALTER_TABLE.drop_column(col_name) return self def rename_column(self, col_name, new_col_name): ''' @name 更新字段名 @author Zhj<2022-09-21> @param col_name 当前字段名 @param new_col_name 新字段名 @return self ''' self.__OPT_ALTER_TABLE.rename_column(col_name, new_col_name) return self def alter_table(self): ''' @name 更新表结构 @author Zhj<2022-09-21> @return bool ''' if self.__OPT_ALTER_TABLE.is_empty(): return False raw_sql = self.__OPT_ALTER_TABLE.build(self.__build_table_name(False)) self.execute_script(raw_sql) return True def add_index(self, idx_name, idx_col): ''' @name 创建索引 @author Zhj<2022-10-11> @param idx_name 索引名称 @param idx_col 字段名称 @return self ''' if not isinstance(idx_col, list): idx_col = [idx_col] self.execute('CREATE INDEX IF NOT EXISTS {} ON {} ({})'.format( _add_backtick_for_field(idx_name), self.__build_table_name(False), ','.join(list(map(lambda x: _add_backtick_for_field(x), idx_col))) )) return self def drop_index(self, idx_name): ''' @name 删除索引 @author Zhj<2022-10-11> @param idx_name 索引名称 @return self ''' self.execute('DROP INDEX IF EXISTS {} ON {}'.format( _add_backtick_for_field(idx_name), self.__build_table_name(False) )) return self def fork(self): ''' @name 克隆一个查询构造器 @author Zhj<2022-07-19> @return SqliteEasy|None ''' if self.__DB is None: return None query = self.__DB.query() query.set_pk(self.__PK) if self.__FROM_SUB_QUERY: query.from_sub_query(self.__DB_TABLE) else: query.table(self.__DB_TABLE) query.prefix(self.__OPT_PREFIX) query.alias(self.__OPT_ALIAS) query.set_where_obj(copy.deepcopy(self.__OPT_WHERE)) query.set_limit_obj(copy.deepcopy(self.__OPT_LIMIT)) query.set_order_obj(copy.deepcopy(self.__OPT_ORDER)) query.set_field_obj(copy.deepcopy(self.__OPT_FIELD)) query.set_group_obj(copy.deepcopy(self.__OPT_GROUP)) query.set_having_obj(copy.deepcopy(self.__OPT_HAVING)) query.set_join_obj(copy.deepcopy(self.__OPT_JOIN)) query.set_update_obj(copy.deepcopy(self.__OPT_UPDATE)) query.set_duplicate_obj(copy.deepcopy(self.__OPT_DUPLICATE)) return query def snapshot(self): ''' @name 创建查询构造器快照 @author Zhj<2025-01-07> @return SqliteEasy|None ''' snapshot = Snapshot() snapshot.pk = self.__PK snapshot.from_sub_query = self.__FROM_SUB_QUERY snapshot.table = self.__DB_TABLE snapshot.prefix = self.__OPT_PREFIX snapshot.alias = self.__OPT_ALIAS snapshot.where = copy.deepcopy(self.__OPT_WHERE) snapshot.limit = copy.deepcopy(self.__OPT_LIMIT) snapshot.order = copy.deepcopy(self.__OPT_ORDER) snapshot.field = copy.deepcopy(self.__OPT_FIELD) snapshot.group = copy.deepcopy(self.__OPT_GROUP) snapshot.having = copy.deepcopy(self.__OPT_HAVING) snapshot.join = copy.deepcopy(self.__OPT_JOIN) snapshot.update = copy.deepcopy(self.__OPT_UPDATE) snapshot.duplicate = copy.deepcopy(self.__OPT_DUPLICATE) return snapshot def restore_from_snapshot(self, snapshot): ''' @name 通过快照还原查询构造器 @author Zhj<2025-01-07> @return self ''' if not isinstance(snapshot, Snapshot): raise PanelError('snapshot must a instance of core.include.sqlite_server.Snapshot') self.set_pk(snapshot.pk) if snapshot.from_sub_query: self.from_sub_query(snapshot.table) else: self.table(snapshot.table) self.prefix(snapshot.prefix) self.alias(snapshot.alias) self.set_where_obj(copy.deepcopy(snapshot.where)) self.set_limit_obj(copy.deepcopy(snapshot.limit)) self.set_order_obj(copy.deepcopy(snapshot.order)) self.set_field_obj(copy.deepcopy(snapshot.field)) self.set_group_obj(copy.deepcopy(snapshot.group)) self.set_having_obj(copy.deepcopy(snapshot.having)) self.set_join_obj(copy.deepcopy(snapshot.join)) self.set_update_obj(copy.deepcopy(snapshot.update)) self.set_duplicate_obj(copy.deepcopy(snapshot.duplicate)) # self.set_alter_table_obj(copy.deepcopy(snapshot.alter_table)) return self def explain_raw_sql(self, raw_sql, bind_params=()): ''' @name 分析SQL语句 @author Zhj<2022-07-21> @param raw_sql SQL语句 @param bind_params 绑定参数 @return string|None ''' ret = self.query('EXPLAIN QUERY PLAN {}'.format(raw_sql), _to_tuple(bind_params)) if ret is None: return None return "\n".join(list(map(lambda x: x.get('detail', ''), ret))) def __to_raw_sql(self, raw_sql, bind_params=()): ''' @name 将sql语句和绑定参数合并 @author Zhj<2022-07-18> @param raw_sql sql语句(含有参数绑定占位符) @param bind_params 绑定参数 @return string ''' return reduce(lambda x, y: search_question_reg.sub(str(y) if is_number(y) else "'%s'" % y, x, 1), bind_params, repr(raw_sql)[1:-1]) def __build_sql(self): ''' @name 构建sql查询语句与绑定参数 @author Zhj<2022-07-18> @return (sql语句, 绑定参数) ''' fields = self.__OPT_FIELD.build() join_condition = self.__OPT_JOIN.build() where_condition, where_params = self.__OPT_WHERE.build() group_condition, group_params = self.__OPT_GROUP.build() having_condition, having_params = self.__OPT_HAVING.build() order_condition, order_params = self.__OPT_ORDER.build() limit_condition = self.__OPT_LIMIT.build() # 查询语句 raw_sql = 'SELECT {fields} FROM {table_name}' \ '{join_condition}{where_condition}{group_condition}' \ '{order_condition}{having_condition}{limit_condition}'.format_map({ 'fields': fields, 'table_name': self.__build_table_name(), 'join_condition': join_condition, 'where_condition': where_condition, 'group_condition': group_condition, 'having_condition': having_condition, 'order_condition': order_condition, 'limit_condition': limit_condition, }) # 绑定参数 bind_params = () bind_params += where_params bind_params += group_params bind_params += having_params bind_params += order_params return raw_sql, bind_params def __build_table_name(self, contain_alias=True): ''' @name 构建表名 @author Zhj<2022-07-17> @param contain_alias 是否包含别名 @return string ''' table_name = self.__DB_TABLE if self.__FROM_SUB_QUERY else _add_backtick_for_field(self.__DB_TABLE) table_alias = '' if contain_alias: table_alias = '' if self.__OPT_ALIAS is None else ' AS {}'.format( _add_backtick_for_field(self.__OPT_ALIAS) ) return '{}{}'.format(table_name, table_alias) def __get_row_key(self, field): ''' @name 获取字段名对应的字典键名 @author Zhj<2022-07-18> @param field 字段名 @return string|None ''' # 移除字符串两段空白字符 field = str(field).strip() m = match_row_key_reg.match(field) # 不符合字段规则 # 返回None if m is None: # 检查有无设置别名 m = search_row_key_reg.search(field) # 设置了别名 if m: return m.group(1).strip('`') return field # 设置了别名时 # 获取别名 if m.group(2) is not None: return m.group(2).strip('`') # 获取字段名 不包含前缀表名 return m.group(1).strip('`') def __is_autocommit(self): ''' @name 检查是否自动提交事务 @author Zhj<2022-07-18> @return bool ''' if self.__DB is None: return False return self.__DB.is_autocommit() def __is_autovacuum(self): ''' @name 检查是否自动释放空间 @author Zhj<2022-09-27> @return bool ''' if self.__DB is None: return False return self.__DB.is_autovacuum() def __clear(self): ''' @name 清空查询条件 @author Zhj<2022-07-17> @return void ''' self.__PK = None self.__OPT_ALIAS = None self.__OPT_FIELD.clear() self.__OPT_JOIN.clear() self.__OPT_WHERE.clear() self.__OPT_GROUP.clear() self.__OPT_HAVING.clear() self.__OPT_ORDER.clear() self.__OPT_LIMIT.clear() self.__OPT_UPDATE.clear() self.__OPT_DUPLICATE.clear()