4
请教,从 FIFO 队列中取出 SQL 语句,写入到 MySQL,如何可以拉满拉爆性能?
source link: https://www.v2ex.com/t/798537
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
情况: 在 main()
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# 建立全局变量字典
GOLVAR = Manager().dict()
# SQL 处理队列
SQLQueue = Manager().Queue()
# 处理 SQL 队列功能,一个单独进程在运行
ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
# 启动
ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR)
def AA(someData,sqlqueue):
#略
XXX
sqlCommand = XXX
sqlqueue.put(sqlCommand)
retrun
def BB(someData,sqlqueue):
#略,和 AA 结构一样,最后往队列里 put(sqlCommand)
sqlqueue.put(sqlCommand)
retrun
def CC(someData,sqlqueue):
#略
sqlqueue.put(sqlCommand)
retrun
# 开动制造
while True:
# AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句
# 进去的 SQL 语句只有四种,
# INSERT INTO tblname (x) VALUE (x);
# INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层);
# UPDATE SET...
# DELETE FROM...
# SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条
time.sleep(100)
# 处理 SQL 队列
def procSQLcmd(sqlinfo, sqlqueue, golvar):
import time
import datetime
from dbutils.pooled_db import PooledDB
import pymysql
from concurrent.futures import ThreadPoolExecutor
from MYFunc import SQLcmdData
from myFunc import colrRedB
from myFunc import TranDicttoSQLcmd
from warnings import filterwarnings
filterwarnings("error", category=pymysql.Warning)
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=600, # 连接池允许的最大连接数,0 和 None 表示不限制连接数
mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=1, # ping MySQL 服务端,检查是否服务可用。
host=sqlinfo['ip'],
port=sqlinfo['port'],
user=sqlinfo['user'],
password=sqlinfo['password'],
database=sqlinfo['database'],
charset=sqlinfo['charset']
)
DBconn = POOL.connection()
def exeCu(conn, sqltext):
try:
cur = conn.cursor()
cur.execute(sqltext)
# cur.commit()
cur.close()
except pymysql.Warning as e:
# print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}"))
sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"')
resonsql = str(e).replace("'","\\'").replace('"','\\"')
SQLErrorDict = {'sqlcmd': sqlsqlcmd,
'reson': resonsql,
'UpdateTime': datetime.datetime.now().replace(microsecond=0)}
SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None)
SQLcmdData(sqlinfo, SQLCmd)
return
while True:
if sqlqueue.qsize() == 0:
# 开关
if golvar['stopsqlflag'] == False:
time.sleep(2)
break
# SQL 语句执行,必须按队列 FIFO 顺序写入
while not sqlqueue.empty():
with ThreadPoolExecutor(1) as executor:
executor.submit(exeCu, DBconn, sqlqueue.get())
DBconn.close()
return
请教问题:
1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环
2 、从 MySQL 的服务器的性能判断来看,
SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%');
SHOW PROCESSLIST;
MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?)
3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善
4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆?
5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK