4

用 IDEA 基于SpringBoot2+ mybatis+Redis实现一个秒杀系统

 1 year ago
source link: https://blog.51cto.com/u_15430445/5544310
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

用 IDEA 基于SpringBoot2+ mybatis+Redis实现一个秒杀系统

推荐 原创

码农小宋 2022-08-04 16:44:19 ©著作权

文章标签 redis spring 限流 文章分类 Java 编程语言 yyds干货盘点 阅读数268

实现一个秒杀系统,采用spring boot 2.x + mybatis+ redis + swagger2 + lombok实现。

先说说基本流程,就是提供一个秒杀接口,然后针对秒杀接口进行限流,限流的方式目前我实现了两种,上次实现的是累计计数方式,这次还有这个功能,并且我增加了令牌桶方式的lua脚本进行限流。

然后不被限流的数据进来之后,加一把分布式锁,获取分布式锁之后就可以对数据库进行操作了。直接操作数据库的方式可以,但是速度会比较慢,咱们直接通过一个初始化接口,将库存数据放到缓存中,然后对缓存中的数据进行操作。

写库的操作采用异步方式,实现的方式就是将操作好的数据放入到队列中,然后由另一个线程对队列进行消费。当然,也可以将数据直接写入mq中,由另一个线程进行消费,这样也更稳妥。

好了,看一下项目的基本结构:

用 IDEA 基于SpringBoot2+ mybatis+Redis实现一个秒杀系统_限流

看一下入口controller类,入口类有两个方法,一个是初始化订单的方法,即秒杀开始的时候,秒杀接口才会有效,这个方法可以采用定时任务自动实现也可以。

初始化后就可以调用placeOrder的方法了。在placeOrder上面有个自定义的注解DistriLimitAnno,这个是我在上篇文章写的,用作限流使用。

采用的方式目前有两种,一种是使用计数方式限流,一种方式是令牌桶,上次使用了计数,咱们这次采用令牌桶方式实现。

package com.hqs.flashsales.controller;

import com.hqs.flashsales.annotation.DistriLimitAnno;
import com.hqs.flashsales.aspect.LimitAspect;
import com.hqs.flashsales.lock.DistributedLock;
import com.hqs.flashsales.limit.DistributedLimit;
import com.hqs.flashsales.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import java.util.Collections;


/**
* @author huangqingshi
* @Date 2019-01-23
*/
@Slf4j
@Controller
public class FlashSaleController {

@Autowired
OrderService orderService;
@Autowired
DistributedLock distributedLock;
@Autowired
LimitAspect limitAspect;
//注意RedisTemplate用的String,String,后续所有用到的key和value都是String的
@Autowired
RedisTemplate<String, String> redisTemplate;

private static final String LOCK_PRE = "LOCK_ORDER";

@PostMapping("/initCatalog")
@ResponseBody
public String initCatalog() {
try {
orderService.initCatalog();
} catch (Exception e) {
log.error("error", e);
}

return "init is ok";
}

@PostMapping("/placeOrder")
@ResponseBody
@DistriLimitAnno(limitKey = "limit", limit = 100, seconds = "1")
public Long placeOrder(Long orderId) {
Long saleOrderId = 0L;
boolean locked = false;
String key = LOCK_PRE + orderId;
String uuid = String.valueOf(orderId);
try {
locked = distributedLock.distributedLock(key, uuid,
"10" );
if(locked) {
//直接操作数据库
// saleOrderId = orderService.placeOrder(orderId);
//操作缓存 异步操作数据库
saleOrderId = orderService.placeOrderWithQueue(orderId);
}
log.info("saleOrderId:{}", saleOrderId);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
if(locked) {
distributedLock.distributedUnlock(key, uuid);
}
}
return saleOrderId;
}

}

令牌桶的方式比直接计数更加平滑,直接计数可能会瞬间达到最高值,令牌桶则把最高峰给削掉了,令牌桶的基本原理就是有一个桶装着令牌,然后又一队人排队领取令牌,领到令牌的人就可以去做做自己想做的事情了,没有领到令牌的人直接就走了(也可以重新排队)。

发令牌是按照一定的速度发放的,所以这样在多人等令牌的时候,很多人是拿不到的。当桶里边的令牌在一定时间内领完后,则没有令牌可领,都直接走了。如果过了一定的时间之后可以再次把令牌桶装满供排队的人领。

基本原理是这样的,看一下脚本简单了解一下,里边有一个key和四个参数,第一个参数是获取一个令牌桶的时间间隔,第二个参数是重新填装令牌的时间(精确到毫秒),第三个是令牌桶的数量限制,第四个是隔多长时间重新填装令牌桶。

-- bucket name
local key = KEYS[1]
-- token generate interval
local intervalPerPermit = tonumber(ARGV[1])
-- grant timestamp
local refillTime = tonumber(ARGV[2])
-- limit token count
local limit = tonumber(ARGV[3])
-- ratelimit time period
local interval = tonumber(ARGV[4])

local counter = redis.call('hgetall', key)

if table.getn(counter) == 0 then
-- first check if bucket not exists, if yes, create a new one with full capacity, then grant access
redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining', limit - 1)
-- expire will save memory
redis.call('expire', key, interval)
return 1
elseif table.getn(counter) == 4 then
-- if bucket exists, first we try to refill the token bucket
local lastRefillTime, tokensRemaining = tonumber(counter[2]), tonumber(counter[4])
local currentTokens
if refillTime > lastRefillTime then
-- check if refillTime larger than lastRefillTime.
-- if not, it means some other operation later than this call made the call first.
-- there is no need to refill the tokens.
local intervalSinceLast = refillTime - lastRefillTime
if intervalSinceLast > interval then
currentTokens = limit
redis.call('hset', key, 'lastRefillTime', refillTime)
else
local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit)
if grantedTokens > 0 then
-- ajust lastRefillTime, we want shift left the refill time.
local padMillis = math.fmod(intervalSinceLast, intervalPerPermit)
redis.call('hset', key, 'lastRefillTime', refillTime - padMillis)
end
currentTokens = math.min(grantedTokens + tokensRemaining, limit)
end
else
-- if not, it means some other operation later than this call made the call first.
-- there is no need to refill the tokens.
currentTokens = tokensRemaining
end

assert(currentTokens >= 0)

if currentTokens == 0 then
-- we didn't consume any keys
redis.call('hset', key, 'tokensRemaining', currentTokens)
return 0
else
-- we take 1 token from the bucket
redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
return 1
end
else
error("Size of counter is " .. table.getn(counter) .. ", Should Be 0 or 4.")
end

看一下调用令牌桶lua的JAVA代码,也比较简单:

public Boolean distributedRateLimit(String key, String limit, String seconds) {
Long id = 0L;
long intervalInMills = Long.valueOf(seconds) * 1000;
long limitInLong = Long.valueOf(limit);
long intervalPerPermit = intervalInMills / limitInLong;
// Long refillTime = System.currentTimeMillis();
// log.info("调用redis执行lua脚本, {} {} {} {} {}", "ratelimit", intervalPerPermit, refillTime,
// limit, intervalInMills);
try {
id = redisTemplate.execute(rateLimitScript, Collections.singletonList(key),
String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()),
String.valueOf(limitInLong), String.valueOf(intervalInMills));
} catch (Exception e) {
log.error("error", e);
}

if(id == 0L) {
return false;
} else {
return true;
}
}

创建两张简单表,一个库存表,一个是销售订单表:

CREATE TABLE `catalog` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
`total` int(11) NOT NULL COMMENT '库存',
`sold` int(11) NOT NULL COMMENT '已售',
`version` int(11) NULL COMMENT '乐观锁,版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `sales_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`cid` int(11) NOT NULL COMMENT '库存ID',
`name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

基本已经准备完毕,然后启动程序,打开swagger(http://localhost:8080/swagger-ui.html#),执行初始化方法initCatalog:

日志里边会输出初始化的记录内容,初始化库存为1000:

初始化执行的方法,十分简单,写到缓存中。

@Override
public void initCatalog() {
Catalog catalog = new Catalog();
catalog.setName("mac");
catalog.setTotal(1000L);
catalog.setSold(0L);
catalogMapper.insertCatalog(catalog);
log.info("catalog:{}", catalog);
redisTemplate.opsForValue().set(CATALOG_TOTAL + catalog.getId(), catalog.getTotal().toString());
redisTemplate.opsForValue().set(CATALOG_SOLD + catalog.getId(), catalog.getSold().toString());
log.info("redis value:{}", redisTemplate.opsForValue().get(CATALOG_TOTAL + catalog.getId()));
handleCatalog();
}

我写了一个测试类,启动3000个线程,然后去进行下单请求:

package com.hqs.flashsales;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import java.util.concurrent.TimeUnit;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FlashsalesApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class FlashSalesApplicationTests {

@Autowired
private TestRestTemplate testRestTemplate;

@Test
public void flashsaleTest() {
String url = "http://localhost:8080/placeOrder";
for(int i = 0; i < 3000; i++) {
try {
TimeUnit.MILLISECONDS.sleep(20);
new Thread(() -> {
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
params.add("orderId", "1");
Long result = testRestTemplate.postForObject(url, params, Long.class);
if(result != 0) {
System.out.println("-------------" + result);
}
}
).start();
} catch (Exception e) {
log.info("error:{}", e.getMessage());
}

}
}

@Test
public void contextLoads() {
}

}

然后开始运行测试代码,查看一下测试日志和程序日志,均显示卖了1000后直接显示SOLD OUT了。分别看一下日志和数据库:

用 IDEA 基于SpringBoot2+ mybatis+Redis实现一个秒杀系统_限流_04
用 IDEA 基于SpringBoot2+ mybatis+Redis实现一个秒杀系统_redis_05

商品库存catalog表和订单明细表sales_order表,都是1000条,没有问题。

用 IDEA 基于SpringBoot2+ mybatis+Redis实现一个秒杀系统_限流_06

通过采用分布式锁和分布式限流,即可实现秒杀流程,当然分布式限流也可以用到很多地方,比如限制某些IP在多久时间访问接口多少次,都可以的。

令牌桶的限流方式使得请求可以得到更加平滑的处理,不至于瞬间把系统达到最高负载。在这其中其实还有一个小细节,就是Redis的锁,单机情况下没有任何问题,如果是集群的话需要注意,一个key被hash到同一个slot的时候没有问题,如果说扩容或者缩容的话,如果key被hash到不同的slot,程序可能会出问题。

在写代码的过程中还出现了一个小问题,就是写controller的方法的时候,方法一定要声明成public的,否则自定义的注解用不了,其他service的注解直接变为空,这个问题也是找了很久才找到。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK