| import aiomysql |
|
|
| from __init__ import mysql_option |
|
|
|
|
| class AIOMYSQL: |
| def __init__(self) -> None: |
| self.pool = None |
|
|
| async def init_pool(self): |
| try: |
| __pool = await aiomysql.create_pool( |
| host=mysql_option["HOST"], |
| port=mysql_option["PORT"], |
| user=mysql_option["USERNAME"], |
| password=mysql_option["PASSWORD"], |
| db=mysql_option["DATABASE"], |
| charset="utf8", |
| autocommit=False, |
| minsize=5, |
| maxsize=10, |
| cursorclass=aiomysql.DictCursor, |
| ) |
| return __pool |
| except: |
| raise ("aiomysql create_pool error") |
|
|
| async def get_cursor(self): |
| conn = await self.pool.acquire() |
| cur = await conn.cursor() |
| return conn, cur |
|
|
| async def query(self, sql, param=None): |
| conn, cur = await self.get_cursor() |
| try: |
| await cur.execute(sql, param) |
| except: |
| await cur.rollback() |
| print("aiomysql query error") |
| else: |
| return await cur.fetchall() |
| finally: |
| if cur: |
| await cur.close() |
| |
| await self.pool.release(conn) |
|
|
| async def update(self, sql, param=None): |
| conn, cur = await self.get_cursor() |
| try: |
| await cur.execute(sql, param) |
| except: |
| await cur.rollback() |
| print("aiomysql query error") |
| else: |
| await conn.commit() |
| finally: |
| if cur: |
| await cur.close() |
| |
| await self.pool.release(conn) |
|
|
|
|
| async def get_aiomysql_instance(): |
| instance = AIOMYSQL() |
| instance.pool = await instance.init_pool() |
| return instance |
|
|