18

SpringCloud Gateway 动态路由【篇2终极版】基于 MySQL + 二级缓存实现

 3 years ago
source link: http://www.eknown.cn/index.php/spring-boot/spring-cloud-gateway-dynamic-routes.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

代码已上传到:https://github.com/laolunsi/spring-boot-examples,springboot -> springcloud,从入门到进阶!


上一节介绍了如何利用 nacos 的配置中心功能来实现 Gateway 动态路由,实现起来很简单,只要引入 nacos-config 的依赖,然后重写 RouteDefinitionRepository 接口即可。这种实现的缺点也很明显 —— 统一维护一个路由配置文件会对整个微服务网关的安全性造成威胁。
想象一下, 某位新来的同事不经意写错了格式,比如 json 的 } ,就会导致所有路由都不可用。这样是比较危险。


这一节介绍存储自定义路由信息到 MySQL,然后通过 Gateway 提供的方法更新缓存数据。为了提高请求速率,这里将使用一个二级缓存(内存 + Redis)的功能。


1. 创建网关服务

首先根据上一节中的 AppRoute 实体类创建一张 MySQL 表:

CREATE TABLE `app_route` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `routeId` varchar(255) NOT NULL,
  `order` int(11) DEFAULT NULL,
  `uri` varchar(255) NOT NULL,
  `predicates` text,
  `filters` text,
  `updateTime` datetime NOT NULL,
  `delete` tinyint(1) NOT NULL DEFAULT '0',
  PRIMARY KEY (`id`,`routeId`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

同上一节创建一个 parent 项目,然后在其下创建一个网关服务,添加 redis 和 mybatis 相关的依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>

下面我们改造一下上一节在网关服务中的一些配置:


application.yml:

server:
  port: 8502

spring:
  application:
    name: gateway-demo222
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useSSL=false
    username: root
    password: root

  redis:
    host: localhost
    password:
    port: 6379
    database: 10

management:
  endpoints:
    web:
      exposure:
        include: health,info,gateway

mybatis:
  mapper-locations: classpath:mapper/*.xml
  configuration:
    map-underscore-to-camel-case: true

# 自定义参数
gateway:
  dynamicRoute:
    dataId: 'yq_routes'
    group: 'YQ_GATEWAY'

创建 AppRoute 的 DAO 和对应的 Mybatis SQL 语句,AppRouteDAO 如:

@Mapper
@Component
public interface AppRouteDAO {

    @Select("select * from app_route")
    List<AppRoute> findAll();

    @Select("select * from app_route where routeId = #{routeId} AND `delete` = 0 LIMIT 1")
    AppRoute findByRouteId(String routeId);

    @Select("select * from app_route where id = #{id} AND `delete` = 0")
    AppRoute findById(Integer id);

    boolean update(AppRoute route);

    boolean insert(AppRoute route);

    boolean delete(AppRoute route);

}

PS: 其它的语句在 /resources/mapper/AppRouteDAO.xml 中。

@Service
public class RouteHandler implements ApplicationEventPublisherAware, CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(RouteHandler.class);

    private ApplicationEventPublisher publisher;

    @Autowired
    private AppRouteService appRouteService;

    @Autowired
    private CacheRouteDefinitionRepository cacheRouteDefinitionRepository;

    @Autowired
    private RouteDefinitionCacheService routeDefinitionCacheService;

    @Override
    public void run(String... args) throws Exception {
        log.info("首次初始化路由....");
        this.loadRouteConfig();
    }

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    public void loadRouteConfig() {
        log.info("加载路由配置...");

        Flux<RouteDefinition> definitionFlux = cacheRouteDefinitionRepository.getRouteDefinitions();
        new Thread(() -> {
            List<String> existRouteIds = definitionFlux.toStream().map(RouteDefinition::getId).collect(Collectors.toList());
            // 也可以用下面这种方法,就不需要 new Thread() 了:
            // List<String> existRouteIds = routeDefinitionCacheService.getRouteDefinitions().stream().map(RouteDefinition::getId).collect(Collectors.toList());

            List<AppRoute> appRouteList = appRouteService.findAll();
            if (appRouteList != null && appRouteList.size() > 0) {
                appRouteList.forEach(a -> {
                    if (BooleanUtils.isTrue(a.getDelete()) && existRouteIds.contains(a.getRouteId())) {
                        deleteRoute(a.getRouteId());
                    } else {
                        RouteDefinition routeDefinition = a.parseToRoute();
                        System.out.println("s: " + JSONObject.toJSONString(routeDefinition));
                        if (routeDefinition != null) {
                            cacheRouteDefinitionRepository.save(Mono.just(routeDefinition)).subscribe();
                        }
                    }
                });
            }

            this.publisher.publishEvent(new RefreshRoutesEvent(this));
        }).start();

    }

    public void deleteRoute(String routeId) {
        log.info("删除路由:" + routeId);
        cacheRouteDefinitionRepository.delete(Mono.just(routeId)).subscribe();
        this.publisher.publishEvent(new RefreshRoutesEvent(this));
    }
}

注意,这里直接用 cacheRouteDefinitionRepository.getRouteDefinitions(),在通过接口更新路由信息后调用此方法时,会出现以下异常:
       java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-3

这个问题的详细解释我还没找到,目前大概理解是 WebFlux 中的异步数据 Flux<T> 被同步的操作调用时,会抛出 blocking 异常。
解决办法

  1. new 一个线程来从 Flux<T> 中获取数据,然后执行操作
  2. 这里不使用 cacheRouteDefinitionRepository.getRouteDefinitions() 了,而是直接用routeDefinitionCacheService.getRouteDefinitions()

RouteDefinitionCacheService,该接口定义了 RouteDefinition 的本地存储和 Redis 存储,这样可以避免每一次读取路由信息都要访问数据库的问题。

@Service
public class RouteDefinitionCacheServiceImpl implements RouteDefinitionCacheService {

    /**
     * 本地缓存
     */
    private static ConcurrentHashMap<String, RouteDefinition> definitionMap = new ConcurrentHashMap<>();

    /**
     * redis 缓存地址
     */
    public static String SPACE = GatewayConfig.NACOS_DATA_ID + ":" + GatewayConfig.NACOS_GROUP_ID;

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public List<RouteDefinition> getRouteDefinitions() {
        List<RouteDefinition> list = new ArrayList<>();
        if (definitionMap.size() > 0) {
            return new ArrayList<>(definitionMap.values());
        } else {
            redisTemplate.opsForHash().values(SPACE)
                    .stream().forEach(r -> {
                        RouteDefinition route = JSONObject.parseObject(r.toString(), RouteDefinition.class);
                list.add(route);
                definitionMap.put(route.getId(), route);
            });
            return list;
        }
    }

    @Override
    public boolean saveAll(List<RouteDefinition> definitions) {
        if (definitions != null && definitions.size() > 0) {
            definitions.forEach(this::save);
            return true;
        }
        return false;
    }

    @Override
    public boolean has(String routeId) {
        return definitionMap.containsKey(routeId) ? true : redisTemplate.opsForHash().hasKey(SPACE, routeId);
    }

    @Override
    public boolean delete(String routeId) {
        if (has(routeId)) {
            definitionMap.remove(routeId);
            redisTemplate.opsForHash().delete(SPACE, routeId);
            return true;
        }
        return false;
    }

    @Override
    public boolean save(RouteDefinition r) {
        if (r != null && StringUtils.isNotBlank(r.getId())) {
            definitionMap.put(r.getId(), r);
            redisTemplate.opsForHash().put(SPACE, r.getId(), JSONObject.toJSONString(r));
            return true;
        }
        return false;
    }
}

CacheRouteDefinitionRepository,以 RouteDefinitionCacheService 为基础,是 RouteDefinitionRepository 的实现类。该类直接给 Gateway 定义了读取路由信息的方式。

@Service
public class CacheRouteDefinitionRepository implements RouteDefinitionRepository {

    @Autowired
    private RouteDefinitionCacheService cacheService;

    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        List<RouteDefinition> list = cacheService.getRouteDefinitions();
        return Flux.fromIterable(list);
    }

    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
        return route.flatMap(r -> {
            cacheService.save(r);
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> delete(Mono<String> routeId) {
        return routeId.flatMap(id -> {
           if (cacheService.has(id)) {
               cacheService.delete(id);
               return Mono.empty();
           }

           return Mono.defer(() -> Mono.error(new NotFoundException("未找到路由配置:" + routeId)));
        });
    }
}

现在我们已经定义好了 MySQL 中的 app_route 表,设计了程序中存储 RouteDefinition 的二级缓存(Redis + Mysql),下面的问题就是:如何将 MySQL 中的 app_route 和缓存结合起来?


2. 连接 Mysql 与 GateWay

首先,项目启动时,系统应该自动加载数据库中的所有路由信息:


编写一个启动类,当项目启动完成后,将初始化路由信息:

@Component
public class StartListener {

    private static final Logger logger = LoggerFactory.getLogger(StartListener.class);

    @Autowired
    private RouteDefinitionCacheService cacheService;

    @Autowired
    private AppRouteService routeService;

    @PostConstruct
    public void init() {
        logger.info("初始化路由数据...");
        List<AppRoute> routeList = routeService.findAll();
        if (routeList != null && routeList.size() > 0) {
            cacheService.saveAll(routeList.stream().map(AppRoute::parseToRoute).collect(Collectors.toList()));
        }
    }
}

然后,我们创建一个 AppRouteAction接口类,定义对路由信息的增删改查接口:

@RestController
@RequestMapping(value = "app/route")
public class AppRouteAction {
    @Autowired
    private AppRouteService appRouteService;

    @GetMapping(value = "list")
    public JsonResult list() {
        JsonResult jsonResult = new JsonResult(true);
        jsonResult.put("routeList", appRouteService.findAll());
        return jsonResult;
    }

    @PostMapping(value = "")
    public JsonResult save(AppRoute route) {
        if (route == null || StringUtils.isBlank(route.getRouteId())) {
            return new JsonResult(false, "id不能为空");
        } else if (StringUtils.isBlank(route.getUri())) {
            return new JsonResult(false, "uri不能为空");
        }

        AppRoute oldRoute = null;
        if (route.getId() != null) {
            oldRoute = appRouteService.findById(route.getId());
            if (oldRoute == null || oldRoute.getId() == null) {
                return new JsonResult(false, "数据不存在或已被删除");
            }
        }

        AppRoute sameRouteIdObj = appRouteService.findByRouteId(route.getRouteId());
        if (sameRouteIdObj != null && sameRouteIdObj.getId() != null) {
            if (route.getId() == null) {
                return new JsonResult(false, "已存在相同 RouteId 的配置");
            }
        }
        route.setPredicates(route.getPredicates() != null ? route.getPredicates().trim() : null);
        route.setFilters(route.getFilters() != null ? route.getFilters().trim() : null);

        boolean res = appRouteService.saveOrUpdate(route);
        return new JsonResult(res, res ? "操作成功" : "操作失败");
    }

    @DeleteMapping(value = "{routeId}")
    public JsonResult delete(@PathVariable("routeId") String routeId) {
        AppRoute route = appRouteService.findByRouteId(routeId);
        if (route == null || StringUtils.isBlank(route.getRouteId())) {
            return new JsonResult(false, "路由不存在");
        }

        boolean res = appRouteService.delete(route);
        return new JsonResult(res, res ? "操作成功" : "操作失败");
    }
}

在路由信息被改变时,程序将通过上面的 RouteHandler 去通知更新路由信息,具体的逻辑代码在 AppRouteService 中:

@Service
public class AppRouteServiceImpl implements AppRouteService {

    private static final Logger logger = LoggerFactory.getLogger(AppRouteService.class);

    @Autowired
    private AppRouteDAO appRouteDAO;

    @Autowired
    private RouteDefinitionCacheService cacheService;

    @Autowired
    private RouteHandler routeHandler;

    @Override
    public List<AppRoute> findAll() {
        return appRouteDAO.findAll();
    }

    @Override
    public boolean saveOrUpdate(AppRoute route) {
        route.setUpdateTime(new Date());
        AppRoute oldRoute = appRouteDAO.findById(route.getId());
        boolean res = false;
        if (oldRoute != null && oldRoute.getId() != null) {
            res = appRouteDAO.update(route);
        } else {
            res = appRouteDAO.insert(route);
        }

        if (res) {
            logger.info("更新缓存,通知网关重新加载路由信息...");
            cacheService.save(route.parseToRoute());
            routeHandler.loadRouteConfig();
        }

        return res;
    }

    @Override
    public boolean delete(AppRoute route) {
        route.setUpdateTime(new Date());
        boolean res = appRouteDAO.delete(route);
        if (res) {
            logger.info("更新缓存,通知网关重新加载路由信息...");
            cacheService.save(route.parseToRoute());
            routeHandler.loadRouteConfig();
        }
        return res;
    }

    @Override
    public AppRoute findByRouteId(String routeId) {
        return appRouteDAO.findByRouteId(routeId);
    }

    @Override
    public AppRoute findById(Integer id) {
        return appRouteDAO.findById(id);
    }
}

到此为止,一个完整的动态路由网关项目已经搭建完毕了,具体的代码请查看:https://github.com/laolunsi/spring-boot-examples 下面我们测试一下。


同上一节创建一个简单的测试服务,取名为 demo,在数据库添加对应配置,如:

INSERT INTO `app_route` 
VALUES 
('1', 'demo', '8003', 'lb://demo', '[{\"name\":\"Path\",\"args\":{\"pattern\":\"/api/demo2/**\"}}]', '[{\"name\":\"StripPrefix\",\"args\":{\"parts\":\"2\"}}]', '2020-08-09 20:54:39', '0');

启动 redis、nacos、网关服务,我们可以看到项目启动后加载了路由信息:

filefile

下面启动 demo 服务,测试一下路由是否正常:

file
file

下面测试一下路由信息被修改后,网关服务是否会自动更新路由信息。
通过刚才编写的接口,修改 demo 服务的路由配置,如:

file
file

我们可以看到网关服务的控制台出现:

file
file

说明新的路由信息被加载了。
测试旧的地址发现 404,然后根据新的配置访问接口:
filefile

好了,本节内容就到此为止啦!
这两篇文章分别介绍了基于 Nacos 和 基于 mysql 来实现的动态路由功能,而这篇文章更提供了一个完整的路由信息管理的实现,想来应付业务场景已经足够了。


个人水平有限,如文章有错误之处还请指正。有疑问也可以联系我,希望对你有所帮助!
thanks.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK