riku
2024-01-10 a9e8e27e0503552b7b2a99c821da732175d4f071
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import pymysql
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import declarative_base, sessionmaker
from utils.date_utils import DateUtils
 
 
class DbConn(object):
    Base = declarative_base()
 
    """ 远程数据库 """
    ip = '114.215.109.124'
    user = 'fumeRemote'
    password = 'feiyu2023'
    port = 3306
    data_base_name = 'fume'
 
    """ 本地数据库 """
    # ip = 'localhost'
    # user = 'root'
    # password = '123456'
    # port = 3306
    # data_base_name = 'fume'
 
    def __init__(self) -> None:
        self.init()
 
    def init(self):
        try:
            engine = create_engine(
                f"mysql+pymysql://{self.user}:{self.password}@{self.ip}:{self.port}/{self.data_base_name}?charset=utf8", pool_recycle=3600, pool_size=50)
            DbConn.Base.metadata.create_all(engine)
            self._Session = sessionmaker(bind=engine)
            self.read_session = self._Session()
        except Exception as e:
            msg = '[ERROR] ' + '[' + DateUtils.now_time()+']: ' + '数据库连接创建失败'
            print(msg, e)
 
    def _retry(self, func):
        retry = 2
        error = ""
        while retry:
            retry -= 1
            try:
                return func(self)
            except Exception as e:
                msg = '[ERROR] ' + '[' + DateUtils.now_time()+']: ' + '数据库获取连接错误'
                print(msg, e)
                # LogUtils.warn("数据库获取连接错误", e)
                # LogUtils.info("尝试重连数据库")
                self.init()
        # LogUtils.error("数据库获取连接错误, 超过重试次数", error)
 
    def _create_session(self):
        """创建会话"""
        return self._Session()
 
    def query_sql(self, func, new_session=False):
        """查询"""
        if new_session:
            def f(self):
                with self._create_session() as s:
                    return func(s)
            return self._retry(f)
        else:
            def f(self):
                return func(self.read_session)
            return self._retry(f)
 
    def insert(self, object):
        """插入一条数据"""
        def f(self):
            with self._create_session() as session:
                session.add(object)
                session.commit()
        return self._retry(f)
 
    def insert_list(self, object_list):
        """插入多条数据"""
        def f(self):
            with self._create_session() as session:
                session.add_all(object_list)
                session.commit()
        return self._retry(f)
 
    def update(self):
        """提交更新"""
        def f(self):
            self.read_session.commit()
        self._retry(f)
 
 
db_conn = DbConn()
 
 
def commit_sql(func):
    """提交事务修饰器"""
    session = db_conn._create_session()
 
    def inner(self, data_obj):
        try:
            session.begin()
            func(self, session, data_obj)
            session.commit()
            return True
        except SQLAlchemyError as e:
            # LogUtils.error("数据库操作异常:", e)
            session.rollback()
            return False
        finally:
            session.close()
    return inner