4

请教,从 FIFO 队列中取出 SQL 语句,写入到 MySQL,如何可以拉满拉爆性能?

 2 years ago
source link: https://www.v2ex.com/t/798537
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

V2EX  ›  Python

请教,从 FIFO 队列中取出 SQL 语句,写入到 MySQL,如何可以拉满拉爆性能?

  uti6770werty · 10 小时 30 分钟前 · 625 次点击

情况: 在 main()

    from multiprocessing import Manager
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    
    # 建立全局变量字典
    GOLVAR = Manager().dict()
    # SQL 处理队列
    SQLQueue = Manager().Queue()
     
    # 处理 SQL 队列功能,一个单独进程在运行
    ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
    # 启动
    ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR)
     
    def AA(someData,sqlqueue):
     	#略
     	XXX
     	sqlCommand = XXX
     	sqlqueue.put(sqlCommand)
     	retrun
    
    def BB(someData,sqlqueue):
     	#略,和 AA 结构一样,最后往队列里 put(sqlCommand)
     	sqlqueue.put(sqlCommand)
     	retrun
     		
    def CC(someData,sqlqueue):
     	#略
     	sqlqueue.put(sqlCommand)
     	retrun    
     		
   # 开动制造
    while True:
  	# AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句
     		
     	# 进去的 SQL 语句只有四种,
     	# INSERT INTO tblname (x) VALUE (x);
     	# INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层);
     	# UPDATE SET...
     	# DELETE FROM...
     		
     	# SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条
       	time.sleep(100)

# 处理 SQL 队列
def procSQLcmd(sqlinfo, sqlqueue, golvar):
    import time
    import datetime
    from dbutils.pooled_db import PooledDB
    import pymysql
    from concurrent.futures import ThreadPoolExecutor
    from MYFunc import SQLcmdData
    from myFunc import colrRedB
    from myFunc import TranDicttoSQLcmd

    from warnings import filterwarnings
    filterwarnings("error", category=pymysql.Warning)

    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=600,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
        mincached=5,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=1,  # ping MySQL 服务端,检查是否服务可用。
        host=sqlinfo['ip'],
        port=sqlinfo['port'],
        user=sqlinfo['user'],
        password=sqlinfo['password'],
        database=sqlinfo['database'],
        charset=sqlinfo['charset']
    )

    DBconn = POOL.connection()
    
    def exeCu(conn, sqltext):
        try:
            cur = conn.cursor()
            cur.execute(sqltext)
            # cur.commit()
            cur.close()
        except pymysql.Warning as e:
            # print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}"))
            sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"')
            resonsql = str(e).replace("'","\\'").replace('"','\\"')
            SQLErrorDict = {'sqlcmd': sqlsqlcmd,
                            'reson': resonsql,
                            'UpdateTime': datetime.datetime.now().replace(microsecond=0)}
            SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None)
            SQLcmdData(sqlinfo, SQLCmd)
        return

    while True:
        if sqlqueue.qsize() == 0:
            # 开关
            if golvar['stopsqlflag'] == False:
                time.sleep(2)
                break

        # SQL 语句执行,必须按队列 FIFO 顺序写入
        while not sqlqueue.empty():
            with ThreadPoolExecutor(1) as executor:
                executor.submit(exeCu, DBconn, sqlqueue.get())

    DBconn.close()
    return		
请教问题:
1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环
2 、从 MySQL 的服务器的性能判断来看,
SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%');
SHOW PROCESSLIST; 
 MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?)
3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善
4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆?
5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK