import pymysql
|
from sqlalchemy import create_engine
|
from sqlalchemy.exc import SQLAlchemyError
|
from sqlalchemy.orm import declarative_base, sessionmaker
|
from utils.date_utils import DateUtils
|
|
|
class DbConn(object):
|
Base = declarative_base()
|
|
""" 远程数据库 """
|
ip = '114.215.109.124'
|
user = 'fumeRemote'
|
password = 'feiyu2023'
|
port = 3306
|
data_base_name = 'fume'
|
|
""" 本地数据库 """
|
# ip = 'localhost'
|
# user = 'root'
|
# password = '123456'
|
# port = 3306
|
# data_base_name = 'fume'
|
|
def __init__(self) -> None:
|
self.init()
|
|
def init(self):
|
try:
|
engine = create_engine(
|
f"mysql+pymysql://{self.user}:{self.password}@{self.ip}:{self.port}/{self.data_base_name}?charset=utf8", pool_recycle=3600, pool_size=50)
|
DbConn.Base.metadata.create_all(engine)
|
self._Session = sessionmaker(bind=engine)
|
self.read_session = self._Session()
|
except Exception as e:
|
msg = '[ERROR] ' + '[' + DateUtils.now_time()+']: ' + '数据库连接创建失败'
|
print(msg, e)
|
|
def _retry(self, func):
|
retry = 2
|
error = ""
|
while retry:
|
retry -= 1
|
try:
|
return func(self)
|
except Exception as e:
|
msg = '[ERROR] ' + '[' + DateUtils.now_time()+']: ' + '数据库获取连接错误'
|
print(msg, e)
|
# LogUtils.warn("数据库获取连接错误", e)
|
# LogUtils.info("尝试重连数据库")
|
self.init()
|
# LogUtils.error("数据库获取连接错误, 超过重试次数", error)
|
|
def _create_session(self):
|
"""创建会话"""
|
return self._Session()
|
|
def query_sql(self, func, new_session=False):
|
"""查询"""
|
if new_session:
|
def f(self):
|
with self._create_session() as s:
|
return func(s)
|
return self._retry(f)
|
else:
|
def f(self):
|
return func(self.read_session)
|
return self._retry(f)
|
|
def insert(self, object):
|
"""插入一条数据"""
|
def f(self):
|
with self._create_session() as session:
|
session.add(object)
|
session.commit()
|
return self._retry(f)
|
|
def insert_list(self, object_list):
|
"""插入多条数据"""
|
def f(self):
|
with self._create_session() as session:
|
session.add_all(object_list)
|
session.commit()
|
return self._retry(f)
|
|
def update(self):
|
"""提交更新"""
|
def f(self):
|
self.read_session.commit()
|
self._retry(f)
|
|
|
db_conn = DbConn()
|
|
|
def commit_sql(func):
|
"""提交事务修饰器"""
|
session = db_conn._create_session()
|
|
def inner(self, data_obj):
|
try:
|
session.begin()
|
func(self, session, data_obj)
|
session.commit()
|
return True
|
except SQLAlchemyError as e:
|
# LogUtils.error("数据库操作异常:", e)
|
session.rollback()
|
return False
|
finally:
|
session.close()
|
return inner
|