60

手写一个 Jedis 以及 JedisPool

 4 years ago
source link: https://www.tuicool.com/articles/Rv2ume3
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

欢迎将公众号设置为星标,技术文章第一时间看到。我们将一如既往精选技术好文,提供有价值的阅读。如果文章对你有帮助,欢迎点个在看鼓励作者。

技术经验交流: 点击入群

Redis、Jedis的用途就不介绍了,不了解的可以先去官网:

https://www.redis.net.cn/tutorial/3501.html

本文章着重讲解如何手动实现一个类似jedis的工具。

1. 源码探索

经过源码的研究,可以发现Jedis的实现是基于Socket,可以从Jedis的set(key,value)方法开始追溯:

public void set(String key, String value) {
this.set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
public void set(byte[] key, byte[] value) {
this.sendCommand(Command.SET, new byte[][]{key, value});
}
public static void sendCommand(RedisOutputStream os, Protocol.Command command, byte[]... args) {
sendCommand(os, command.raw, args);
}

private static void sendCommand(RedisOutputStream os, byte[] command, byte[]... args) {
try {
os.write((byte)42);
os.writeIntCrLf(args.length + 1);
os.write((byte)36);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
byte[][] var3 = args;
int var4 = args.length;

for(int var5 = 0; var5 < var4; ++var5) {
byte[] arg = var3[var5];
os.write((byte)36);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}

} catch (IOException var7) {
throw new JedisConnectionException(var7);
}
}
System.out.println((char)42);
System.out.println(((char)36));

控制台:

*

$

可以大体的看出他得实现过程。把sendCommand方法翻译一下就是:

os.write("*".getBytes());
os.write(//数组长度\r\n);
os.write("$".getBytes());
os.write(//命令长度\r\n)
os.write(命令\r\n);
然后进入循环写入:
$+参数长度\r\n
参数\r\n

整理一下:就是Redis的通信协议,官方文档:https://redis.io/topics/protocol 。

总结一下就是以下几点:

*3   // 数据一共有三个数组
//数组1
$6 //下行为6个长度的字符串
APPEND
//数组2
$5 // 下行为5个长度的字符串
fantj
//数组3
$3 // 下行为3个长度的字符串
666

手写Jedis

通过源码可以看到,它在Connection类中进行sendCommend,那我们也一样:

Connection.java

负责与Redis的Server端建立连接并获取反馈信息。

package com.fantj.jedis.connect;

import com.fantj.jedis.protocol.Protocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
* 连接类
* 在这里进行创建连接并处理IO请求,用inputStream进行数据回显,
* 提供OutputStream给协议层,以便让其给服务端发送命令
*/

public class Connection {
private String host = "localhost";
private int port = 6379;
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;

public Connection() {
}

public Connection(String host) {
this.host = host;
}

public Connection(String host, int port) {
this.host = host;
this.port = port;
}
public Connection sendCommand(Protocol.Command cmd, byte[]... args) {
connect();
Protocol.sendCommand(outputStream, cmd, args);
return this;
}

private void connect() {

try {
if (socket == null) { //IO复用
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 操作状态的返回
* 比如:SET 操作成功返回 +OK
*/

public String getStatus() {
byte[] bytes = new byte[1024];
try {
socket.getInputStream().read(bytes);
} catch (IOException e) {
e.printStackTrace();
}
return new String(bytes);
}
}

而真正的sendCommend实现是在Protocol类中实现(Jedis源码也是这样):

Protocol.java

负责提供RESP协议支持和拼接。

package com.fantj.jedis.protocol;

import java.io.IOException;
import java.io.OutputStream;

/**
* RESP协议
* 详见; https://redis.io/topics/protocol
*/

public class Protocol {
// jedis后来将这些常量优化为byte,在os进行写出的时候对其进行char转型
private static final String DOLLAR_BYTE = "$";
private static final String ASTERISK_BYTE = "*";
public static final byte PLUS_BYTE = 43;
public static final byte MINUS_BYTE = 45;
public static final byte COLON_BYTE = 58;
private static final String BLANK_BYTE = "\r\n";

/**
* 拼接RESP 并 发送write
*/

public static void sendCommand(OutputStream os,Protocol.Command cmd, byte[] ... args){
// 1. 生成协议 *3 $3 SET $3 key $5 value
StringBuffer stringBuffer = new StringBuffer();
// 1.1 数组长度 *3
stringBuffer.append(ASTERISK_BYTE).append(args.length+1).append(BLANK_BYTE);
// 1.2 命令长度 $3
stringBuffer.append(DOLLAR_BYTE).append(cmd.name().length()).append(BLANK_BYTE);
// 1.3 命令 SET / GET
stringBuffer.append(cmd).append(BLANK_BYTE);
for (byte[] arg: args){
// 1.4 key/value 长度
stringBuffer.append(DOLLAR_BYTE).append(arg.length).append(BLANK_BYTE);
// 1.5 key/value
stringBuffer.append(new String(arg)).append(BLANK_BYTE);
}
// 写出到服务端
try {
os.write(stringBuffer.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}

}


/**
* 定义一个枚举类 存放命令
*/

public static enum Command{
SET , GET , KEYS, APPEND
}}

Client.java

对外提供API。

package com.fantj.jedis.client;

import com.fantj.jedis.connect.Connection;
import com.fantj.jedis.protocol.Protocol;

/**
* 客户端
* 给开发人员使用提供API
*/
public class Client {
private Connection connection;
public Client(String host, int port){
connection = new Connection(host,port);
}

public String set(String key, String value) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes());
return connection.getStatus();
}

public String get(String key) {
connection.sendCommand(Protocol.Command.GET, key.getBytes());
return connection.getStatus();
}


public void set(String key, String value, String nx, String ex, int i) {
connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes(), nx.getBytes(), ex.getBytes(), String.valueOf(i).getBytes());
}
public void append(String key, String value){
connection.sendCommand(Protocol.Command.APPEND, key.getBytes(),value.getBytes());
}
}

Jedis.java

对Client类的进一层封装,留给开发人员使用。

package com.fantj.jedis.client;

public class Jedis extends Client{

public Jedis(String host, int port) {
super(host, port);
}

@Override
public String set(String key, String value) {
return super.set(key, value);
}

@Override
public String get(String key) {
return super.get(key);
}

@Override
public void set(String key, String value, String nx, String ex, int i) {
super.set(key, value, nx, ex, i);
}

@Override
public void append(String key, String value) {
super.append(key, value);
}
}

测试

/**
* 测试我们自己写的客户端
*/

public class Main {
private Jedis client = new Jedis("www.xxx.top",6380);
@Test
public void set()
{
client.set("fantj","fantj");
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void setNx()
{
client.set("fantj","fantj","NX","EX",10000);
String result = client.get("fantj");
System.out.println(result);
}
@Test
public void append()
{
// client.append("fantj","-2019");
String fantj = client.get("fantj");
System.out.println(fantj);
}
@Test
public void testChar()
{
System.out.println((char)42);
System.out.println(((char)36));
}
}

测试都可以通过。

总结一下原理

假设我执行append("fantj","666")这个命令,那客户端的操作过程:

1、RESP协议对命令的分析

*3   // 数据一共有三个数组
//数组1
$6 //下行为6个长度的字符串
APPEND
//数组2
$5 // 下行为5个长度的字符串
fantj
//数组3
$3 // 下行为3个长度的字符串
666

2、然后我们进行拼接

StringBuffer sb = new StringBuffer();
// 注意每个类型表示完后都进行换行
sb.append("*").append("3").append("\r\n");
sb.append("$").append("6").append("\r\n");
sb.append("APPEND").append("\r\n");
sb.append("$").append("5").append("\r\n");
sb.append("fantj").append("\r\n");
sb.append("$").append("3").append("\r\n");
sb.append("666").append("\r\n");

3、然后用os进行写出

os.write(sb.toString().getBytes());

连接池的实现

Pool<T> .java

/**
* 连接池契约
*/

public interface Pool<T> {
/**
* 初始化连接池
* @param maxTotal 最大连接数
* @param maxWaitMillis 最大等待时间
*/

public void init(int maxTotal, long maxWaitMillis);

/**
* 获取连接
* @return 返回jedis对象
*/

public Jedis getResource() throws Exception;

/**
* 释放连接
*/

public void release(T t);
}

JedisPool.java

实现方式与ExcutorThreadPool工作流程类似,注释写的挺全的就不做详细解读了。

package com.fantj.jedis.pool;

import com.fantj.jedis.client.Jedis;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class JedisPool implements Pool<Jedis>{

public JedisPool(String url, int port) {
this.url = url;
this.port = port;
init(maxTotal,maxWaitMillis);
}

public JedisPool(String url, int port, int maxTotal, long maxWaitMillis) {
this.url = url;
this.port = port;
this.maxTotal = maxTotal;
this.maxWaitMillis = maxWaitMillis;
}

private String url;
private int port;
private int maxTotal = 20;
private long maxWaitMillis = 1000;
// 空闲的连接queue
private LinkedBlockingQueue<Jedis> idleWorkQueue = null;
private Queue<Jedis> activeWorkQueue = null;
// 当前连接数量
private AtomicInteger count = new AtomicInteger(0);
@Override
public void init(int maxTotal, long maxWaitMillis) {
maxTotal = maxTotal;
maxWaitMillis = maxWaitMillis;
idleWorkQueue = new LinkedBlockingQueue<>(maxTotal);
activeWorkQueue = new LinkedBlockingQueue<>(maxTotal);
}

@Override
public Jedis getResource() throws Exception {
Jedis jedis = null;
// 1. 记录开始时间,检测超时
long startTime = System.currentTimeMillis();
while (true){
// 2. 从空闲队列中获取连接,如果拿到,一式两份存放到活动队列
jedis = idleWorkQueue.poll();
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
// 3. 如果失败,判断池是否满,没满则创建
if (count.get() < maxTotal){
if (count.incrementAndGet() <= maxTotal){
jedis = new Jedis(url,port);
activeWorkQueue.offer(jedis);
System.out.printf("创建了一个新的连接: %s \r\n", jedis.toString());
return jedis;
}else {
count.decrementAndGet();
}
}
// 4. 如果连接池满了,则在超时时间内进行等待
try {
jedis = idleWorkQueue.poll(maxWaitMillis-(System.currentTimeMillis()-startTime), TimeUnit.MILLISECONDS);
if (jedis != null){
activeWorkQueue.offer(jedis);
return jedis;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 5. poll可能被中断,所以在这里再进行超时判断
if (maxWaitMillis < (System.currentTimeMillis()-startTime)){
throw new RuntimeException("JedisPool: jedis connect timeout");
}
}
}

@Override
public void release(Jedis jedis) {
if (activeWorkQueue.remove(jedis)){
idleWorkQueue.offer(jedis);
}
}
}

GitHub地址: https://github.com/fantj2016/easy-jedis

(点击文字可跳转)

1.   深究Spring中Bean的生命周期

2.  深入SpringBoot核心注解原理

3. 线上环境部署概览

4. Springboot Vue shiro 实现前后端分离、权限控制

QZju6ry.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK