9

手写数据库连接池 - D-booker

 1 year ago
source link: https://www.cnblogs.com/D-booker/p/16398833.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

📕数据库连接池项目

一、项目意义#

在设计前先了解一下数据库连接池的作用:

除了在服务器端增加缓存服务器缓存常用的数据 之外(例如redis),还可以增加连接池,来提高MySQL Server的访问效率,在高并发情况下,大量的 TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的 性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗

二、环境配置#

MySQ数据库编程环境配置:

在win10的vs项目中用C/C++客户端开发包,vs做如下配置:

1.右键项目属性 - C/C++ - 常规 - 附加包含目录,填写mysql.h头文件的路径

2.右键项目属性 - 链接器 - 常规 - 附加库目录,填写libmysql.lib的路径

3.右键项目属性 - 链接器 - 输入 - 附加依赖项,填写libmysql.lib库的名字

4.把libmysql.dll动态链接库(Linux下后缀名是.so库)放在工程目录下;

如果运行数据库连接文件出现:

image-20220415102021161

说明系统环境变量中没有配置MySQL环境,找不到动态链接,解决方案:在系统环境变量添加MySQL的bin文件夹路径; 也可以在 右键项目属性 - 调试 - 环境,填写 PATH=自己MySQL的bin文件夹路径;

三、项目设计#

所需要实现的数据库连接池功能:连接池有连接数量的起始值和阈值;连接池可以动态自动生成和回收连接池里面的资源;连接池和数据库信息等分离,达到复用简单的效果;

①设计一个数据库的的基本操作类

  • 封装数据库的连接、增删改查操作;
  • 给该基本操作类加上存活时间相关的属性和方法;

②数据库连接池类

  • 因为是一个池对应多个资源(对象)的关系,我们也只需要一个池,设计成单例模式;

  • 保存含有基本操作类一群对象资源;

  • 有资源的初始化、增加、释放操作(释放和存活时间相关)

四、详细代码#

代码结构:

image-20220621230851817

①public.h: 该连接池的全局日志输出,打印一些错误在log中;

#pragma once
/*
* 定义宏等全局定义
*/

#define LOG(str) \
	cout << __FILE__ << ":" << __LINE__ << " " << \
	__TIMESTAMP__ << " : " << str << endl;

②mysql.ini:mysql的详细信息

#数据库连接池的配置文件,和宏定义一样,注意行后面不要有空格
ip=127.0.0.1
port=3306
username=root
password=root
dbname=chat
initSize=10
maxSize=1024
#最大空闲时间默认秒
maxIdleTime=60
#连接超时时间单位是毫秒
connectionTimeout=100

③Connection.h 和 Connection.cpp :数据库操作类 头文件和实现;

Connection.h:

using namespace std;
#include "public.h"

// 数据库操作类
class Connection
{
public:
	// 初始化数据库连接
	Connection();
	// 释放数据库连接资源
	~Connection();
	// 连接数据库
	bool connect(string ip, unsigned short port, string user, string password, string dbname);
	// 更新操作 insert、delete、update
	bool update(string sql);
	// 查询操作:select;
	MYSQL_RES* query(string sql);

	//刷新一下连接的起始空闲时间点
	void refreshAliveTime() { _alivetime = clock(); }//clock()函数当下时间
	// 返回存活的时间
	clock_t getAliveTime() { return clock() - _alivetime; }

private:
	MYSQL* _conn; // 表示和MySQL Server的一条连接
	clock_t _alivetime;// 记录进入空闲状态后的存活时间
};

Connection.cpp:

#include "Connection.h"
#include "public.h"

// 初始化数据库连接
Connection::Connection()
{
	_conn = mysql_init(nullptr);
}
// 释放数据库连接资源
Connection::~Connection()
{
	if (_conn != nullptr)
		mysql_close(_conn);
}
// 连接数据库
bool Connection::connect(string ip, unsigned short port, string username, string password, string dbname)
{
	MYSQL* p = mysql_real_connect(_conn, ip.c_str(), username.c_str(), password.c_str(), dbname.c_str(), port, nullptr, 0);
	return p != nullptr;
}
// 更新操作 insert、delete、update
bool Connection::update(string sql)
{
	if (mysql_query(_conn, sql.c_str()))//如果查询成功,返回0。如果出现错误,返回非0值。
	{
		LOG("更新失败:" + sql);
		return false;
	}
	return true;
}
// 查询操作:select;
MYSQL_RES* Connection::query(string sql)
{
	if (mysql_query(_conn, sql.c_str()))
	{
		LOG("查询失败:" + sql);
		return nullptr;
	}
	return mysql_use_result(_conn);
}

④MySQLConnectionPool.h和MySQLConnectionPool.cpp:连接池类头文件和实现

MySQLConnectionPool.h:

#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <memory>
#include <functional>
#include "Connection.h"

using namespace std;
/*
* 实现连接池模块
*/
class ConnectionPool
{
public:
	//获取连接池对象实例
	static ConnectionPool* getConnectionPool();
	// 消费者线程函数:给用户连接,归还时放回连接池
	shared_ptr<Connection> getConnection();

private:
	//单例#1 构造函数私有化
	ConnectionPool(); 

	//从配置文件中加载配置
	bool loadConfigFile();

	//生产者线程函数:运行独立的线程中,负责生产新连接,放在类内方便访问成员变量
	void produceConnectionTask();

	//回收线程函数:扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收
	void scannerConnectionTask();

	string _ip;// mysql ip
	unsigned short _port; //mysql 端口 默认3306
	string _username;// mysql 用户名;
	string _password;// mysql登录秘密
	string _dbname; // 数据库名称
	int _initSize;// 连接池的初始连接量
	int _maxSize;// 连接池的最大连接量
	int _maxIdleTime; //连接池最大空闲时间
	int _connectionTimeout;//连接池获取连接的超时时间

	queue<Connection*> _connectionQue; //存储mysql连接的队列,必须是线程安全的;
	mutex _queueMutex;//维护连接队列的线程安全互斥锁
	atomic_int _connectionCnt;// 记录连接所创建的connection连接的总数量,考虑了连接生产消费数量变化的线程安全问题
	condition_variable cv; //设置条件变量,用于连接 生产线程和消费线程的通信
};


MySQLConnectionPool.cpp:

// MySQlConnectionPool.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//避免重复包含头文件
#ifndef __COMPLEX__
#define __COMPLEX__

#include <iostream>
#include <string>
#include "MySQLConnectionPool.h"
#include "public.h"

#endif;
// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool()
{
	static ConnectionPool pool; // 静态变量实现lock和unlock,拿单例的线程池;
	return &pool;
}

// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile()
{
	FILE* pf = fopen("mysql.ini", "r");
	if (pf == nullptr)
	{
		LOG("mysql.ini file is not exist!");
		return false;
	}

	while (!feof(pf))//末尾查一下
	{
		char line[1024] = { 0 };
		fgets(line, 1024, pf);
		string str = line;
		int idx = str.find('=', 0);//找出第一个出现=号的下标
		if (idx == -1)//找不到,无效配置项
		{
			continue;
		}

		//实际中是由\n结尾的,password=root\n
		int endidx = str.find('\n', idx);
		string key = str.substr(0, idx); //参数意义:截取的起点以及截取长度
		string value = str.substr(idx + 1, endidx - idx - 1);

		//存值
		if (key == "ip") _ip = value;
		else if (key == "port") _port = atoi(value.c_str());
		else if (key == "username") _username = value;
		else if (key == "password") _password = value;
		else if (key == "dbname") _dbname = value;
		else if (key == "initSize") _initSize = atoi(value.c_str());
		else if (key == "maxSize") _maxSize = atoi(value.c_str());
		else if (key == "maxIdleTime") _maxIdleTime = atoi(value.c_str());
		else if (key == "connectionTimeout") _connectionTimeout = atoi(value.c_str());

	}
}

// 连接池的构造
ConnectionPool::ConnectionPool()
{
	//加载配置项
	if (!loadConfigFile())
	{
		return;
	}

	//创建初始数量的连接
	for (int i = 0; i < _initSize; ++i)
	{
		Connection* p = new Connection();
		p->connect(_ip, _port, _username, _password, _dbname);
		p->refreshAliveTime();// 刷新一下开始空闲起始时间;
		_connectionQue.push(p);
		_connectionCnt++;
	}

	//需要启动一个新线程,作为连接的生产者(生产者线程) 
	//c++的线程函数在linux里面底层也是pthread_creat,需要传入c接口,所以传入类方法需要绑定
	thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
	produce.detach();

	//启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收
	thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this));
	scanner.detach();
}

//生产者线程:运行独立的线程中,负责生产新连接,放在类内方便访问成员变量
void ConnectionPool::produceConnectionTask()
{
	for (;;)
	{
		unique_lock<mutex> lock(_queueMutex);
		while (!_connectionQue.empty())
		{
			cv.wait(lock); //队列不为空,此处生产线程进入等待状态,释放锁
		}

		//可以生产新连接,创建新连接
		if (_connectionCnt < _maxSize) { 
			Connection* p = new Connection();
			p->connect(_ip, _port, _username, _password, _dbname);
			p->refreshAliveTime();// 刷新一下开始空闲起始时间;		
			_connectionQue.push(p);
			_connectionCnt++;
		}

		//通知消费者线程可以消费连接了
		cv.notify_all();
	}
}

// 消费者线程:给用户连接,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
	unique_lock<mutex> lock(_queueMutex);
	//if (_connectionQue.empty())//空的就让生产者生产
	//{
	//	//不可以用sleep,sleep是直接睡,而wait_for是被通知就可以马上继续走
	//	cv.wait_for(lock, chrono::milliseconds(_connectionTimeout));//毫秒,超过时间没有被唤醒的话也会出来
	//	if (_connectionQue.empty())
	//	{
	//		LOG("获取空闲连接超时了!!!获取连接失败");
	//		return nullptr;
	//	}
	//}

	//上述没有考虑好,有可能等待过程中是被唤醒的,但是拿锁慢,还是被拿走了锁
	//优化一下:
	while (_connectionQue.empty())
	{
		if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))
		{
			//是真的超时了, 并且连接池为空
			if (_connectionQue.empty())
			{
				LOG("获取空闲连接超时了!!!获取连接失败");
				return nullptr;
			}			
		}
	}

	//有连接在池子里
	/*
	shared_ptr智能指针析构时,默认会调用connection析构函数,connection就会被close
	这里就需要自定义share_ptr的释放资源方式:把connection直接归还到_connectionQue中;
	*/
	shared_ptr<Connection> sp(_connectionQue.front(),
		[&](Connection* pcon) {
			//这里是在服务器(多线程)消费者线程中调用的,涉及了共享数据,所以一定要考虑队列的线程安全操作
			unique_lock<mutex> lock(_queueMutex);
			pcon->refreshAliveTime();// 刷新一下开始空闲起始时间;
			_connectionQue.push(pcon);
		});
	_connectionQue.pop();
	//if (_connectionQue.empty()) //这样写也可以
	//{
	//	cv.notify_all();//消费完连接后发现队列为空,通知生产者线程;
	//}
	cv.notify_all();//消费完连接后,通知生产者线程检查线程池是否为空;
	return sp;
}

//回收线程函数:扫描超过maxIdleTime时间的空闲连接,进行多余的连接回收
void ConnectionPool::scannerConnectionTask()
{
	for (;;)
	{
		//通过sleep模拟定时效果
		this_thread::sleep_for(chrono::seconds(_maxIdleTime));
	
		// 扫描整个队列,释放多余连接
		unique_lock<mutex> lock(_queueMutex);
		while (_connectionCnt > _initSize)
		{
			Connection* p = _connectionQue.front();
			//这里都释放?应该释放大于initSize以上的?
			if (p->getAliveTime() > _maxIdleTime * 1000) //##队头的空闲时间是最长的,只用看队头就行
			{
				_connectionQue.pop();
				_connectionCnt--;
				delete p;//调用connectin析构函数
			}
			else
			{
				break;//队头都小于,后面肯定小;
			}
		}
	}
}

五、代码测试#

进行压力测试对比一下使用连接池和不使用连接池的效果;

测试代码:main函数手动测试

#include <iostream>
#include <thread>
#include "Connection.h"
#include "MySQLConnectionPool.h"


using namespace std;


int main()
{
	/*
	 * 数据库测试
	*/
	//Connection conn;
	//char sql[1024] = { 0 };
	//sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//conn.update(sql);

	//压力测试:
	// 
	//①不用连接池,单线程,更改数据1000、5000、10000
	clock_t begin = clock();
	for (int i = 0; i < 1000; ++i) {
		Connection conn;
		char sql[1024] = { 0 };
		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
		conn.update(sql);
	}
	clock_t end = clock();
	cout << end - begin << "ms" << endl;

	//②用连接池单线程,更改数据1000、5000、10000
	//clock_t begin = clock();
	//ConnectionPool *cp = ConnectionPool::getConnectionPool();
	//for (int i = 0; i < 5000; ++i) {
	//	shared_ptr<Connection> sp= cp->getConnection();
	//	char sql[1024] = { 0 };
	//	sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//	sp->update(sql);
	//}
	//clock_t end = clock();
	//cout << end - begin << "ms" << endl;

	//③不用连接池的4线程
	//不能在多线程中同时连接数据库,是非法的,需要先在外面声明连接
	//Connection conn;
	//conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//clock_t begin = clock();
	//thread t1([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//thread t2([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//thread t3([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//thread t4([]() {
	//	for (int i = 0; i < 250; ++i) {
	//		Connection conn;
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		conn.connect("127.0.0.1", 3306, "root", "root", "chat");
	//		conn.update(sql);
	//	}
	//	});
	//t1.join();
	//t2.join();
	//t3.join();
	//t4.join();
	//clock_t end = clock();
	//cout << end - begin << "ms" << endl;

	//④用连接池的4线程
	//clock_t begin = clock();
	//thread	t1([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//thread	t2([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//thread	t3([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//thread	t4([]() {
	//	ConnectionPool* cp = ConnectionPool::getConnectionPool();
	//	for (int i = 0; i < 250; ++i) {
	//		shared_ptr<Connection> sp = cp->getConnection();
	//		char sql[1024] = { 0 };
	//		sprintf(sql, "insert into user(name, age, sex) values ('%s', %d, '%s')", "zhang san", 20, "male");
	//		sp->update(sql);
	//	}
	//	});
	//t1.join();
	//t2.join();
	//t3.join();
	//t4.join();
	//clock_t end = clock();
	//cout << end - begin << "ms" << endl;

	return 0;
}

刚开始连接速度,插入速度极慢的原因:

是因为 mysql8.0 一些设置是默认开启的(5.7 是默认关闭的),而这些设置可能会严重影响数据库性能

执行以下优化:

最后我还是换成了5.7的版本进行测试:

数据量 未使用连接池所耗时间 使用连接池所耗时间
1000 单线程:1886ms 四线程:495ms 单线程:1078ms 四线程:406ms
5000 单线程:10032ms 四线程:2368ms 单线程:5328ms 四线程:2033ms
10000 单线程:19407ms 四线程:4579ms 单线程:10532ms四线程:4041ms

锻炼的技术点:MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和 unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型;


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK