import pymysql from sqlalchemy import create_engine from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import declarative_base, sessionmaker from utils.date_utils import DateUtils class DbConn(object): Base = declarative_base() """ 远程数据库 """ ip = '114.215.109.124' user = 'fumeRemote' password = 'feiyu2023' port = 3306 data_base_name = 'fume' """ 本地数据库 """ # ip = 'localhost' # user = 'root' # password = '123456' # port = 3306 # data_base_name = 'fume' def __init__(self) -> None: self.init() def init(self): try: engine = create_engine( f"mysql+pymysql://{self.user}:{self.password}@{self.ip}:{self.port}/{self.data_base_name}?charset=utf8", pool_recycle=3600, pool_size=50) DbConn.Base.metadata.create_all(engine) self._Session = sessionmaker(bind=engine) self.read_session = self._Session() except Exception as e: msg = '[ERROR] ' + '[' + DateUtils.now_time()+']: ' + '数据库连接创建失败' print(msg, e) def _retry(self, func): retry = 2 error = "" while retry: retry -= 1 try: return func(self) except Exception as e: msg = '[ERROR] ' + '[' + DateUtils.now_time()+']: ' + '数据库获取连接错误' print(msg, e) # LogUtils.warn("数据库获取连接错误", e) # LogUtils.info("尝试重连数据库") self.init() # LogUtils.error("数据库获取连接错误, 超过重试次数", error) def _create_session(self): """创建会话""" return self._Session() def query_sql(self, func, new_session=False): """查询""" if new_session: def f(self): with self._create_session() as s: return func(s) return self._retry(f) else: def f(self): return func(self.read_session) return self._retry(f) def insert(self, object): """插入一条数据""" def f(self): with self._create_session() as session: session.add(object) session.commit() return self._retry(f) def insert_list(self, object_list): """插入多条数据""" def f(self): with self._create_session() as session: session.add_all(object_list) session.commit() return self._retry(f) def update(self): """提交更新""" def f(self): self.read_session.commit() self._retry(f) db_conn = DbConn() def commit_sql(func): """提交事务修饰器""" session = db_conn._create_session() def inner(self, data_obj): try: session.begin() func(self, session, data_obj) session.commit() return True except SQLAlchemyError as e: # LogUtils.error("数据库操作异常:", e) session.rollback() return False finally: session.close() return inner