SpringCloud Gateway 动态路由【篇2终极版】基于 MySQL + 二级缓存实现
source link: http://www.eknown.cn/index.php/spring-boot/spring-cloud-gateway-dynamic-routes.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
代码已上传到:https://github.com/laolunsi/spring-boot-examples,springboot -> springcloud,从入门到进阶!
上一节介绍了如何利用 nacos 的配置中心功能来实现 Gateway 动态路由,实现起来很简单,只要引入 nacos-config 的依赖,然后重写 RouteDefinitionRepository 接口即可。这种实现的缺点也很明显 —— 统一维护一个路由配置文件会对整个微服务网关的安全性造成威胁。
想象一下, 某位新来的同事不经意写错了格式,比如 json 的 }
,就会导致所有路由都不可用。这样是比较危险。
这一节介绍存储自定义路由信息到 MySQL,然后通过 Gateway 提供的方法更新缓存数据。为了提高请求速率,这里将使用一个二级缓存(内存 + Redis)的功能。
1. 创建网关服务
首先根据上一节中的 AppRoute 实体类创建一张 MySQL 表:
CREATE TABLE `app_route` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`routeId` varchar(255) NOT NULL,
`order` int(11) DEFAULT NULL,
`uri` varchar(255) NOT NULL,
`predicates` text,
`filters` text,
`updateTime` datetime NOT NULL,
`delete` tinyint(1) NOT NULL DEFAULT '0',
PRIMARY KEY (`id`,`routeId`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
同上一节创建一个 parent 项目,然后在其下创建一个网关服务,添加 redis 和 mybatis 相关的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
下面我们改造一下上一节在网关服务中的一些配置:
application.yml:
server:
port: 8502
spring:
application:
name: gateway-demo222
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useSSL=false
username: root
password: root
redis:
host: localhost
password:
port: 6379
database: 10
management:
endpoints:
web:
exposure:
include: health,info,gateway
mybatis:
mapper-locations: classpath:mapper/*.xml
configuration:
map-underscore-to-camel-case: true
# 自定义参数
gateway:
dynamicRoute:
dataId: 'yq_routes'
group: 'YQ_GATEWAY'
创建 AppRoute 的 DAO 和对应的 Mybatis SQL 语句,AppRouteDAO 如:
@Mapper
@Component
public interface AppRouteDAO {
@Select("select * from app_route")
List<AppRoute> findAll();
@Select("select * from app_route where routeId = #{routeId} AND `delete` = 0 LIMIT 1")
AppRoute findByRouteId(String routeId);
@Select("select * from app_route where id = #{id} AND `delete` = 0")
AppRoute findById(Integer id);
boolean update(AppRoute route);
boolean insert(AppRoute route);
boolean delete(AppRoute route);
}
PS: 其它的语句在 /resources/mapper/AppRouteDAO.xml 中。
@Service
public class RouteHandler implements ApplicationEventPublisherAware, CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(RouteHandler.class);
private ApplicationEventPublisher publisher;
@Autowired
private AppRouteService appRouteService;
@Autowired
private CacheRouteDefinitionRepository cacheRouteDefinitionRepository;
@Autowired
private RouteDefinitionCacheService routeDefinitionCacheService;
@Override
public void run(String... args) throws Exception {
log.info("首次初始化路由....");
this.loadRouteConfig();
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void loadRouteConfig() {
log.info("加载路由配置...");
Flux<RouteDefinition> definitionFlux = cacheRouteDefinitionRepository.getRouteDefinitions();
new Thread(() -> {
List<String> existRouteIds = definitionFlux.toStream().map(RouteDefinition::getId).collect(Collectors.toList());
// 也可以用下面这种方法,就不需要 new Thread() 了:
// List<String> existRouteIds = routeDefinitionCacheService.getRouteDefinitions().stream().map(RouteDefinition::getId).collect(Collectors.toList());
List<AppRoute> appRouteList = appRouteService.findAll();
if (appRouteList != null && appRouteList.size() > 0) {
appRouteList.forEach(a -> {
if (BooleanUtils.isTrue(a.getDelete()) && existRouteIds.contains(a.getRouteId())) {
deleteRoute(a.getRouteId());
} else {
RouteDefinition routeDefinition = a.parseToRoute();
System.out.println("s: " + JSONObject.toJSONString(routeDefinition));
if (routeDefinition != null) {
cacheRouteDefinitionRepository.save(Mono.just(routeDefinition)).subscribe();
}
}
});
}
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}).start();
}
public void deleteRoute(String routeId) {
log.info("删除路由:" + routeId);
cacheRouteDefinitionRepository.delete(Mono.just(routeId)).subscribe();
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
}
注意,这里直接用 cacheRouteDefinitionRepository.getRouteDefinitions(),在通过接口更新路由信息后调用此方法时,会出现以下异常:
java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-3这个问题的详细解释我还没找到,目前大概理解是 WebFlux 中的异步数据 Flux<T> 被同步的操作调用时,会抛出 blocking 异常。
解决办法
- new 一个线程来从 Flux<T> 中获取数据,然后执行操作
- 这里不使用 cacheRouteDefinitionRepository.getRouteDefinitions() 了,而是直接用routeDefinitionCacheService.getRouteDefinitions()
RouteDefinitionCacheService,该接口定义了 RouteDefinition 的本地存储和 Redis 存储,这样可以避免每一次读取路由信息都要访问数据库的问题。
@Service
public class RouteDefinitionCacheServiceImpl implements RouteDefinitionCacheService {
/**
* 本地缓存
*/
private static ConcurrentHashMap<String, RouteDefinition> definitionMap = new ConcurrentHashMap<>();
/**
* redis 缓存地址
*/
public static String SPACE = GatewayConfig.NACOS_DATA_ID + ":" + GatewayConfig.NACOS_GROUP_ID;
@Autowired
private RedisTemplate redisTemplate;
@Override
public List<RouteDefinition> getRouteDefinitions() {
List<RouteDefinition> list = new ArrayList<>();
if (definitionMap.size() > 0) {
return new ArrayList<>(definitionMap.values());
} else {
redisTemplate.opsForHash().values(SPACE)
.stream().forEach(r -> {
RouteDefinition route = JSONObject.parseObject(r.toString(), RouteDefinition.class);
list.add(route);
definitionMap.put(route.getId(), route);
});
return list;
}
}
@Override
public boolean saveAll(List<RouteDefinition> definitions) {
if (definitions != null && definitions.size() > 0) {
definitions.forEach(this::save);
return true;
}
return false;
}
@Override
public boolean has(String routeId) {
return definitionMap.containsKey(routeId) ? true : redisTemplate.opsForHash().hasKey(SPACE, routeId);
}
@Override
public boolean delete(String routeId) {
if (has(routeId)) {
definitionMap.remove(routeId);
redisTemplate.opsForHash().delete(SPACE, routeId);
return true;
}
return false;
}
@Override
public boolean save(RouteDefinition r) {
if (r != null && StringUtils.isNotBlank(r.getId())) {
definitionMap.put(r.getId(), r);
redisTemplate.opsForHash().put(SPACE, r.getId(), JSONObject.toJSONString(r));
return true;
}
return false;
}
}
CacheRouteDefinitionRepository,以 RouteDefinitionCacheService 为基础,是 RouteDefinitionRepository 的实现类。该类直接给 Gateway 定义了读取路由信息的方式。
@Service
public class CacheRouteDefinitionRepository implements RouteDefinitionRepository {
@Autowired
private RouteDefinitionCacheService cacheService;
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
List<RouteDefinition> list = cacheService.getRouteDefinitions();
return Flux.fromIterable(list);
}
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return route.flatMap(r -> {
cacheService.save(r);
return Mono.empty();
});
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return routeId.flatMap(id -> {
if (cacheService.has(id)) {
cacheService.delete(id);
return Mono.empty();
}
return Mono.defer(() -> Mono.error(new NotFoundException("未找到路由配置:" + routeId)));
});
}
}
现在我们已经定义好了 MySQL 中的 app_route 表,设计了程序中存储 RouteDefinition 的二级缓存(Redis + Mysql),下面的问题就是:如何将 MySQL 中的 app_route 和缓存结合起来?
2. 连接 Mysql 与 GateWay
首先,项目启动时,系统应该自动加载数据库中的所有路由信息:
编写一个启动类,当项目启动完成后,将初始化路由信息:
@Component
public class StartListener {
private static final Logger logger = LoggerFactory.getLogger(StartListener.class);
@Autowired
private RouteDefinitionCacheService cacheService;
@Autowired
private AppRouteService routeService;
@PostConstruct
public void init() {
logger.info("初始化路由数据...");
List<AppRoute> routeList = routeService.findAll();
if (routeList != null && routeList.size() > 0) {
cacheService.saveAll(routeList.stream().map(AppRoute::parseToRoute).collect(Collectors.toList()));
}
}
}
然后,我们创建一个 AppRouteAction接口类,定义对路由信息的增删改查接口:
@RestController
@RequestMapping(value = "app/route")
public class AppRouteAction {
@Autowired
private AppRouteService appRouteService;
@GetMapping(value = "list")
public JsonResult list() {
JsonResult jsonResult = new JsonResult(true);
jsonResult.put("routeList", appRouteService.findAll());
return jsonResult;
}
@PostMapping(value = "")
public JsonResult save(AppRoute route) {
if (route == null || StringUtils.isBlank(route.getRouteId())) {
return new JsonResult(false, "id不能为空");
} else if (StringUtils.isBlank(route.getUri())) {
return new JsonResult(false, "uri不能为空");
}
AppRoute oldRoute = null;
if (route.getId() != null) {
oldRoute = appRouteService.findById(route.getId());
if (oldRoute == null || oldRoute.getId() == null) {
return new JsonResult(false, "数据不存在或已被删除");
}
}
AppRoute sameRouteIdObj = appRouteService.findByRouteId(route.getRouteId());
if (sameRouteIdObj != null && sameRouteIdObj.getId() != null) {
if (route.getId() == null) {
return new JsonResult(false, "已存在相同 RouteId 的配置");
}
}
route.setPredicates(route.getPredicates() != null ? route.getPredicates().trim() : null);
route.setFilters(route.getFilters() != null ? route.getFilters().trim() : null);
boolean res = appRouteService.saveOrUpdate(route);
return new JsonResult(res, res ? "操作成功" : "操作失败");
}
@DeleteMapping(value = "{routeId}")
public JsonResult delete(@PathVariable("routeId") String routeId) {
AppRoute route = appRouteService.findByRouteId(routeId);
if (route == null || StringUtils.isBlank(route.getRouteId())) {
return new JsonResult(false, "路由不存在");
}
boolean res = appRouteService.delete(route);
return new JsonResult(res, res ? "操作成功" : "操作失败");
}
}
在路由信息被改变时,程序将通过上面的 RouteHandler 去通知更新路由信息,具体的逻辑代码在 AppRouteService 中:
@Service
public class AppRouteServiceImpl implements AppRouteService {
private static final Logger logger = LoggerFactory.getLogger(AppRouteService.class);
@Autowired
private AppRouteDAO appRouteDAO;
@Autowired
private RouteDefinitionCacheService cacheService;
@Autowired
private RouteHandler routeHandler;
@Override
public List<AppRoute> findAll() {
return appRouteDAO.findAll();
}
@Override
public boolean saveOrUpdate(AppRoute route) {
route.setUpdateTime(new Date());
AppRoute oldRoute = appRouteDAO.findById(route.getId());
boolean res = false;
if (oldRoute != null && oldRoute.getId() != null) {
res = appRouteDAO.update(route);
} else {
res = appRouteDAO.insert(route);
}
if (res) {
logger.info("更新缓存,通知网关重新加载路由信息...");
cacheService.save(route.parseToRoute());
routeHandler.loadRouteConfig();
}
return res;
}
@Override
public boolean delete(AppRoute route) {
route.setUpdateTime(new Date());
boolean res = appRouteDAO.delete(route);
if (res) {
logger.info("更新缓存,通知网关重新加载路由信息...");
cacheService.save(route.parseToRoute());
routeHandler.loadRouteConfig();
}
return res;
}
@Override
public AppRoute findByRouteId(String routeId) {
return appRouteDAO.findByRouteId(routeId);
}
@Override
public AppRoute findById(Integer id) {
return appRouteDAO.findById(id);
}
}
到此为止,一个完整的动态路由网关项目已经搭建完毕了,具体的代码请查看:https://github.com/laolunsi/spring-boot-examples 下面我们测试一下。
同上一节创建一个简单的测试服务,取名为 demo,在数据库添加对应配置,如:
INSERT INTO `app_route`
VALUES
('1', 'demo', '8003', 'lb://demo', '[{\"name\":\"Path\",\"args\":{\"pattern\":\"/api/demo2/**\"}}]', '[{\"name\":\"StripPrefix\",\"args\":{\"parts\":\"2\"}}]', '2020-08-09 20:54:39', '0');
启动 redis、nacos、网关服务,我们可以看到项目启动后加载了路由信息:
下面启动 demo 服务,测试一下路由是否正常:
下面测试一下路由信息被修改后,网关服务是否会自动更新路由信息。
通过刚才编写的接口,修改 demo 服务的路由配置,如:
file
我们可以看到网关服务的控制台出现:
file
说明新的路由信息被加载了。
测试旧的地址发现 404,然后根据新的配置访问接口:
file
好了,本节内容就到此为止啦!
这两篇文章分别介绍了基于 Nacos 和 基于 mysql 来实现的动态路由功能,而这篇文章更提供了一个完整的路由信息管理的实现,想来应付业务场景已经足够了。
个人水平有限,如文章有错误之处还请指正。有疑问也可以联系我,希望对你有所帮助!
thanks.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK