SpringCloud


SpringCloud

微服务是一种软件架构风格,它是以专注于单一职责的很多小型项目为基础,组合出复杂的大型应用。

课程在线文档戳我访问

Docker

Docker 是一个快速构建、运行、管理应用的工具。

快速入门

镜像和容器

当我们利用 Docker 安装应用时,Docker 会自动搜索并下载应用镜像(image)。镜像不仅包含应用本身,还包含应用运行所需要的环境、配置、系统函数库。Docker 会在运行镜像时创建一个隔离环境,称为容器(container)。

镜像仓库:存储和管理镜像的平台,Docker 官方维护了一个公共仓库:Docker Hub

Docker 部署 MySQL

docker run -d --name mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123 mysql
  1. docker run :run 是 docker 下的子命令,意为 “创建并运行一个容器”-d 是让容器在后台运行。
  2. --name [name]:给容器起一个名字,必须唯一。
  3. -p 3306:3306:设置端口映射(前面是宿主机端口,后面是容器内端口)。容器是隔离的环境,有自己独立的内存空间、独立的文件系统、独立的网络空间……当然,容器的网络空间对外是不可访问的,所以我们只能通过访问宿主机,来简介访问容器的环境,这就需要设置端口映射。
  4. -e KEY=VALUE:配置环境变量,由镜像决定的,不同镜像的 key 和 value 不一致。
  5. mysql:指定运行的镜像名字。完整写法:[repository]:[tag],其中 repository 就是镜像名、tag 是镜像版本。

Docker 基础

常见命令

Docker 最常见的就是操作镜像、容器的命令。详情见官方文档

  • docker pull:拉取镜像仓库中的镜像。
  • docker push:把镜像推送到镜像仓库中。
  • docker images:查看当前镜像。
  • docker rmi:删除镜像。
  • docker build:利用 DockerFile 构建镜像。
  • docker save:把自己打包好的镜像整合成文件包。
  • docker load:加载已经打包好的镜像文件包。
  • docker run:创建并运行一个容器。
  • docker stop:停止运行中的容器。(停止的是容器中的进程,但是容器依旧存在)
  • docker start:启动指定容器,不会创建容器。
  • docker ps:查看当前容器运行状态。
  • docker rm:删除容器。
  • docker logs:查看运行日志。
  • docker exec:进入到容器内部。

数据卷

大部分镜像只是对应用运行需要的环境进行一个最小化的打包,不会打包其他的命令(如 llvi 等)。所以,在容器中,要去修改文件十分困难,更不要说进行文件的拷贝等复杂操作了。

数据卷(volume)是一个虚拟目录,是容器内目录与宿主机目录之间映射的桥梁。只要是在 linux 系统中,数据卷的位置就处于 /var/lib/docker/volumes,从容器中映射出来的文件就需要通过数据卷间接地挂载到这个目录下。

  • docker volume create:创建数据卷。
  • docker volume ls:查看所有数据卷。
  • docker volume rm:删除指定数据卷。
  • docker volume inspect:查看某个数据卷的详情。
  • docker volume prune:清除数据卷。

在执行 docker run 命令时,使用 -v 数据卷:容器内目录 可以完成数据卷挂载。当创建容器时,如果挂载了数据卷且数据卷不存在,会自动创建数据卷。注意:容器如果创建了,就无法再挂载数据卷,挂载数据卷的操作只有在创建容器的时候才能进行。并且,删除容器后,如果容器之前已经挂载过数据卷,则数据卷依旧会保存下来。

在执行 docker run 命令时,使用 -v 本地目录:容器内目录 可以完成本地目录挂载。本地目录必须以 / 或者 ./ 开头,如果直接以名称开头,则会被识别为数据卷而并非本地目录。

在创建容器时,有时候也会自动生成一些数据卷并对镜像文件当中的数据文件进行挂载。这种自动生成的数据卷我们称为匿名卷。匿名卷的名字是随机生成的,十分长,阅读性极差。

自定义镜像

镜像就是包含了应用程序、程序运行的系统函数库、运行配置等文件的文件包。构建镜像的过程其实就是把上述文件打包的过程。

例如,我们如果要构建一个 Java 镜像,我们需要:

  1. 准备一个 Linux 运行环境。
  2. 安装 JRE 并配置环境变量。
  3. 拷贝 jar 包。
  4. 编写运行脚本。

上述的每一次操作都称之为(Layer)。添加安装包、依赖、配置等每一次操作都会形成新的一层。分层的用处便是共享一些基础的层(BaseImage,基础镜像,比如应用依赖的系统函数库、环境、配置、文件等),避免重复的基础打包操作。而在下载的时候,镜像文件也是分层下载的,一些基础的层在本地存在的话,也就无需再次下载了。而镜像的结构便是由入口(Entrypoint,镜像运行入口,一般是程序启动的脚本和参数)、层、基础镜像组成的。

而自定义镜像便需要使用到 Dockerfile,Dockerfile 就是一个文本文件,其中包含一个个的指令(Instruction),用指令来说明要执行什么操作来构建镜像。将来 Docker 可以根据 Dockerfile 来帮我们构建镜像,常见指令如下:

指令 说明 示例
FROM 指定基础镜像 FROM centos:6
ENV 设置环境变量,可在后面指令使用 ENV key value
COPY 拷贝本地文件到镜像的指定目录 COPY ./jrell.tar.gz /tmp
RUN 执行 Linux 的 shell 命令,一般是安装过程的命令 RUN tar ...
EXPOSE 指定容器运行时监听的端口,是给镜像使用者看的 EXPOSE 8080
ENTRYPOINT 镜像中应用的启动命令,容器运行时调用 ENTRYPOINT java -jar xx.jar

示例:

# 基础镜像(已经包含系统文件)
FROM openjdk:11.0-jre-buster
# 拷贝jar包
COPY docker-demo.jar /app.jar
# 入口
ENTRYPOINT ["java", "-jar", "/app.jar"]

当编写好了 Dockerfile 后,可以用下面的命令来构建镜像:

docker build -t myImage:1.0 .
  • -t 是给镜像起名,格式依然是 repository:tag 的格式。
  • . 是指 Dockerfile 所在目录,如果就在当前目录,则指定为 .

容器网络互连

默认情况下,所有容器都是以 bridge 方式连接到 Docker 的一个虚拟网桥上。安装 Docker 的那一刻, Docker 便会在虚拟机中创建一张虚拟的网卡,这张网卡默认名叫 docker0,并且会创建一个虚拟网桥。网卡的地址范围 172.17.0.1/16

当你创建一个容器后,就会和这个网卡创建一个桥接,然后被分配到一个 ip 地址。但是这个 ip 地址并不是固定的,一个容器被分配到的 ip 是随机的。加入自定义网络的容器才可以通过容器名互相访问,Docker 的网络操作命令如下:

命令 说明
docker network create 创建一个网络
docker netword ls 查看所有网络
docker network rm 删除指定网络
docker network prune 清除未使用的网络
docker network connect 使指定容器连接加入某网络
docker network disconnect 使指定容器连接离开某网络
docker network inspect 查看网络详细信息

docker run 的时候,通过 --network [networkname] 可以直接连接指定网卡网桥。

DockerCompose

Docker Compose 通过一个单独的 docker-compose.yml 模板文件(YAML格式)来定义一组相关联的应用容器,帮助我们实现多个相互关联的 Docker 容器的快速部署。(例如前后端项目的快速部署)

yml 配置文件模板如下:

version: "3.8"

services:
  containerA:
  	image: A
  	container_name: A
  	ports:
  	  - "11:11"
  containerB:
    image: B
    container_name: B
    ports:
      - "22:22"
  containerC:
    image: C
    container_name: C
    ports:
      - "33:33"

例如,部署的命令如下:

docker run -d \
  --name mysql \
  -p 3306:3306 \
  -e TZ=Asia/Shanghai \
  -e MYSQL_ROOT_PASSWORD=123 \
  -v ./mysql/data:/var/lib/mysql \
  -v ./mysql/conf:/etc/mysql/conf.d \
  -v ./mysql/init:/docker-entrypoint-initdb.d \
  --network hmall
  mysql

对应的 yml 文件为:

version: "3.8"
services:
  mysql:
    image: mysql
    container_name: mysql
    ports:
      - "3306:3306"
    environment:
      TZ: Asia/Shanghai
      MYSQL_ROOT_PASSWORD: 123
    volumes:
      - "./mysql/data:/var/lib/mysql"
      - "./mysql/conf:/etc/mysql/conf.d"
      - "./mysql/init:/docker-entrypoint-initdb.d"
    network:
      - hmall

DockerCompose 命令如下:

docker compose [options] [command]

Options:

  • -f:指定 compose 文件的路径和名称。
  • -p:指定 project 名称。
  • -d:后台运行。

Command:

  • up:创建并启动所有 service 容器。
  • down:停止并溢出所有容器、网络。
  • ps:列出所有启动的容器。
  • logs:查看指定容器的日志。
  • stop:停止容器。
  • start:启动容器。
  • restart:重启容器。
  • top:查看运行的进程。
  • exec:在指定的运行中容器中执行命令。

微服务

认识微服务

  • 单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署。优点在于架构简单,部署成本低。但是缺点就是团队协作成本高,系统发布效率低,系统可用性差。总的来说,单体架构适合开发功能相对简单,规模较小的项目。

  • 微服务:微服务架构,是服务化思想指导下的一套最佳实践架构方案。服务化,就是把单体架构中的功能模块拆分为多个独立项目,各个独立的项目部署到独立的 tomcat 上,并且每个服务间的数据库也是隔离的。对服务进行拆分的时候要注意拆分粒度应该小、服务开发团队自治,服务自治。

  • SpringCloud:SpringCloud 是目前国内使用最广泛的微服务框架。SpringCloud 集成了各种微服务功能组件,并基于 SpringBoot 实现了这些组件的自动装配,从而提供了良好的开箱即用体验。

微服务拆分

微服务的拆分原则如下:

  • 创业型项目:先采用单体架构,快速开发,快速试错。随着规模扩大,逐渐拆分。
  • 确定的大型项目:资金充足,目标明确,可以直接选择微服务架构,避免后续拆分麻烦。

从拆分目标来说,要做到:

  • 高内聚:每隔微服务的职责要尽量单一,包含的业务相互关联度高,完整度高。
  • 低耦合:每隔微服务的功能要相对独立,尽量减少对其他微服务的依赖。

从拆分方式来说,一般包含两种方式:

  • 纵向拆分:按照业务模块来拆分。
  • 横向拆分:抽取公共服务,提高复用性。

拆分的工程结构有两种:

  • 独立 Project:将每一个微服务都创建成一个独立的 project,每一个独立的 project 都有一个独立的仓库。可以有效降低耦合度,但是管理起来较为复杂。
  • Maven 聚合:利用父级 Maven 工程来管理子级 Maven 微服务工程,适合中小型项目应用。

本次进行拆分示例的商城项目分为五个模块:用户模块、商品模块、购物车模块、订单模块、支付模块

拆分商品服务

步骤如下:

  1. 在父级 Maven 中,新建 “模块”,左侧选择 “Java”,然后使用 Maven 构建,创建一个新的 Maven 子工程。
  2. 对子级 Maven 的 pom.xml 文件进行进一步的修改,引入必要的依赖。
  3. 接着,新建项目的启动类、各种包、以及配置文件(注意配置文件中的 spring.application.name 需要更改!)。
  4. 每一个项目为了实现微服务的数据独立性,所以需要部署多台数据库,可以在 docker 中创建多个容器来运行。(为了方便,本次示例采用不同的 database 来代替,一个微服务对应一个 database。)
  5. 拷贝原单体架构中的各种类,优先拷贝实体类(保证后续拷贝不报错),然后再拷贝 mapper、service、controller。
  6. 最后在 IDEA 中,按下 alt + 8 设置启动类的有效配置文件为 local(右侧 Maven 工具刷新依赖即可刷新启动类)。

远程调用

在进行服务拆分时,有时候会出现如下问题:在单体架构中,一个服务需要依赖另一个服务的数据库内容和 Service 层、Mapper 层等等。但是,在微服务的拆分模式下,将会导致二者的隔离,部分代码无法使用。为了解决这种问题,我们就需要引入远程调用的思想。

远程调用的实现大致如下:假设微服务 A 和微服务 B 是两个独立的服务,现在服务 A 需要从服务 B 中拿取数据,就可以向服务 B 发送请求,服务 B 接收到请求后,处理数据,再把结果返回给服务 A,这样就可以实现两个独立服务的相互数据传输了。

Spring 给我们提供了一个 RestTemplate 工具,可以方便地实现 Http 请求的发送。使用步骤如下:

  1. 注入 RestTemplate 到 Spring 容器:

    // 以下这段简单的配置代码可以写到配置类中
    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
  2. 发起远程调用:

    public <T> ResponseEntity<T> exchange(
    	String url,	// 请求路径
        HttpMethod method,	// 请求方式
        @Nullable HttpEntity<?> requestEntity,	// 请求实体,可以为空
        Class<T> responseType,	// 返回值类型
        Map<String, ?> uriVariables	// 请求参数
    )

示例:

@Resource
private RestTemplate restTemplate;

// 查询购物车列表
private void handleCartItems(List<CartVO> vos) {
    // 1.获取商品id
    Set<Long> itemIds = vos.stream().map(CartVO::getItemId).collect(Collectors.toSet());
    
    // 2.查询商品
    
    // 利用itemService查询数据,在微服务下已经不适用
    // List<ItemDTO> items = itemService.queryItemByIds(itemIds);
    
    // 2.1.利用RestTemplate发起http请求,得到http响应
    ResponseEntity<List<ItemDTO>> response = restTemplate.exchange(
            "http://localhost:8024/items?ids={ids}", // {}占位符
            HttpMethod.GET,
            null,
            // 需要返回List时候,这里要填上ParameterizedTypeReference的一个对象
            new ParameterizedTypeReference<List<ItemDTO>>() {},
            Map.of("ids", CollUtils.join(itemIds, ","))
    );
    
    // 2.2.解析响应
    if (!response.getStatusCode().is2xxSuccessful()) {
        // 查询失败,直接结束
        return;
    }
    List<ItemDTO> items = response.getBody();
    
    //...得到items后对items的业务代码...
}

服务治理

我们在上一小节使用了 RedisTemplate 进行远程调用,实现了不同服务之间的访问。但是,远程调用也存在一定的问题,当我们单独其中一个服务压力比较大的时候,可能会部署多份,形成一个负载均衡的集群。这样一来,exchange 函数在发起请求的时候,请求 url 是不好填写的,涉及到访问路径和访问端口,实际上根本就很难去进行动态填写。

注册中心原理

在服务治理中,向别人发起请求请求数据的,称为服务调用者;向别人提供数据的,我们称为服务提供者。任何一个微服务都可以作为服务调用者或者服务提供者。

为了让各种微服务的作用可以更加方便管理,这个时候,我们就需要一个注册中心来管理各种各样的服务信息。服务提供者需要在注册中心进行注册,而服务调用者就可以在注册中心订阅各种服务的信息了。

为了保证服务提供者的部署安全性,服务提供者还需要和注册中心实现心跳检测操作,确保服务提供者提供的服务稳定安全。当注册中心通过心跳检测检测不到某个已经宕机的服务提供者,就会直接把这个服务提供者从列表中剔除,并且通知服务调用者不要去访问这个已经宕机的服务提供者,保证了数据访问的安全性。

Nacos 注册中心

Nacos 是目前国内企业中占比最多的注册中心组件。它是阿里巴巴的产品,目前已经加入 Spring Cloud Alibaba中。

由于我们基于 Docker 来部署 Nacos 注册中心,所以需要准备 MySQL 的数据表,用来存储 Nacos 的数据,具体的 sql 文件戳这里访问

./nacos/custom.env 中准备如下配置:

PREFER_HOST_MODE=hostname
MODE=standalone
SPRING_DATASOURCE_PLATFORM=mysql
# 以下为虚拟机的ip
MYSQL_SERVICE_HOST=192.168.23.130
MYSQL_SERVICE_DB_NAME=nacos
MYSQL_SERVICE_PORT=3306
MYSQL_SERVICE_USER=root
MYSQL_SERVICE_PASSWORD=123
MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai

使用以下命令来安装和启动 Nacos:

docker run -d --name nacos --env-file ./nacos/custom.env -p 8848:8848 -p 9848:9848 -p 9849:9849 --restart=always nacos/nacos-server:v2.1.0-slim

之后,便可以通过 虚拟机ip:8848/nacos 访问到 nacos 注册中心,使用 users 表中的账号和密码来进行登录。

启动时先利用 docker logs nacos 看看 nacos 的启动情况,确保正确连接上 mysql。

服务注册

服务注册步骤如下:

  1. 引入 nacos discovery 依赖:

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
  2. 配置 Nacos 地址:

    spring:
      application:
        name: item-service	# 服务名称
      cloud:
        nacos:
          server-addr: 192.168.23.130:8848	# nacos地址

这样一来,就可以把服务提供者注册到 nacos 注册中心里面了。

服务发现和负载均衡

上一小节我们通过配置文件,把服务提供者配置到了 nacos 中,接下来,我们就要把服务调用者配到 nacos 中。

  1. 引入 nacos discovery 依赖。(同上)

  2. 配置 Nacos 地址。(同上)

  3. 服务发现:

    @Resource
    private DiscoveryClient discoveryClient;
    
    private void handleCartItems(List<CartVO> vos) {
        // 1.获取商品id
        Set<Long> itemIds = vos.stream().map(CartVO::getItemId).collect(Collectors.toSet());
        
        // 2.查询商品
        // 2.1.根据服务名称获取服务实例列表
        List<ServiceInstance> instances = discoveryClient.getInstances("item-service");
        if (CollUtil.isEmpty(instances)) {
            return;
        }
        
        // 2.2.负载均衡,从实例列表中挑选一个实例
        ServiceInstance instance = instances.get(RandomUtil.randomInt(instances.size()));
        URI uri = instance.getUri();    // 获取ip+port
        
        // 2.3.利用RestTemplate发起http请求,得到http响应
        ResponseEntity<List<ItemDTO>> response = restTemplate.exchange(
                uri + "/items?ids={ids}", // 这里不再是硬编码了
                HttpMethod.GET,
                null,
                new ParameterizedTypeReference<>() {
                },
                Map.of("ids", CollUtils.join(itemIds, ","))
        );
        
        // 2.4.解析响应
        if (!response.getStatusCode().is2xxSuccessful()) {
            // 查询失败,直接结束
            return;
        }
        List<ItemDTO> items = response.getBody();
        
        //...得到items后对items的业务代码...
    }

OpenFeign

快速入门

OpenFeign 是一个声明式的 http 客户端,是 SpringCloud 在 Eureka 公司开源的 Feign 基础上改造而来。其作用就是基于 SpringMVC 的常见注解,帮我们优雅的实现 http 请求的发送。

我们在上一节中,使用了一段核心代码来进行远程调用:

// 2.1.根据服务名称获取服务实例列表
List<ServiceInstance> instances = discoveryClient.getInstances("item-service");
if (CollUtil.isEmpty(instances)) {
    return;
}

// 2.2.负载均衡,从实例列表中挑选一个实例
ServiceInstance instance = instances.get(RandomUtil.randomInt(instances.size()));
URI uri = instance.getUri();    // 获取ip+port

// 2.3.利用RestTemplate发起http请求,得到http响应
ResponseEntity<List<ItemDTO>> response = restTemplate.exchange(
        uri + "/items?ids={ids}", // 这里不再是硬编码了
        HttpMethod.GET,
        null,
        new ParameterizedTypeReference<>() {
        },
        Map.of("ids", CollUtils.join(itemIds, ","))
);

OpenFeign 旨在对上述代码进行封装,帮助我们简化操作:

  1. 引入依赖:

    <!-- OpenFeign -->
    <dependency>
    	<groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <!-- 负载均衡 -->
    <dependency>
    	<groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-loadbalancer</artifactId>
    </dependency>
  2. 通过 @EnableFeignClients 注解,启用 OpenFeign 功能:

    @EnableFeignClients
    @SpringBootApplication
    public class CartApplication { 
        //...略
    }
  3. 编写 FeignClient(无需编写实现类,OpenFeign 会动态生成):

    @FeignClient(value = "item-service")	// 根据value去nacos拉取列表
    public interface ItemClient {
        @GetMapping("/items")
        List<ItemDTO> queryItemByIds(@RequestParam("ids") Collection<Long> ids);
    }
  4. 使用 FeignClient,实现远程调用:

    List<ItemDTO> items = itemClient.queryItemByIds(List.of(1, 2, 3));

连接池

OpenFeign 对 Http 请求做了优雅的伪装,不过其底层发起 http 请求,依赖于其它的框架。这些框架可以自己选择,包括以下三种:

  • HttpURLConnection:默认实现,不支持连接池。
  • Apache HttpClient:支持连接池。
  • OKHttp:支持连接池。

具体源码可以参考 FeignBlockingLoadBalancerClient 类中的 delegate 成员变量。

OpenFeign 整合 OKHttp 的步骤如下:

  1. 引入依赖:

    <dependency>
    	<groupId>io.github.openfeign</groupId>
        <artifactId>feign-okhttp</artifactId>
        <version>13.3</version>
    </dependency>
  2. 开启连接池功能:

    feign:
      okhttp:
        enabled: true	# 开启OKHttp连接池支持

最佳实践

为了防止一个 OpenFeign Client 在多处微服务重复编写,我们需要进行代码的复用。

假设现在有两个微服务 B 和 C,都需要同时使用微服务 A 的 api,一种解决方案是:

  1. 将 A 拆分成三个 Maven 工程:A-dto 存放 A 服务的 dto,A-api 存放 A 服务的 OpenFeign Client,A-biz 存放 A 服务的业务代码。
  2. 后续 B 和 C 如果需要使用到与 A 相关的类和接口,只需要在 pom 文件中引入 A 服务对应的几个模块的坐标即可。

这样一来,虽然项目结构会比较复杂、松散。但是从代码层面上讲,确实这是一个能够提高复用性,且比较合理的解决方案。比较适合一开始就拆分成小模块的微服务项目。

另一种方案是:直接创建一个新的 Maven 工程,定义三个包 clientconfigdtoclient 包中存放各个微服务想要暴露的 OpenFeign Client;dto 包中存放所有需要使用的公共 dto 类。然后把需要复用的代码放入其中。

这样一来,项目结构虽然稍微简单点,但是也提高了代码的耦合度。比较适合 Maven 聚合型的工程。

当定义的 FeignClient 不在 SpringBootApplication 的扫描包范围时,这些 FeignClient 无法使用。有两种方式解决:

  1. 指定 FeignClient 所在包:

    @EnableFeignClients(basePackages = "com.hmall.api.clients")
  2. 指定 FeignClient 字节码:

    @EnableFeignClients(clients = {UserClient.class})

日志输出

OpenFeign 只会在 FeignClient 所在包的日志级别为 DEBUG 时,才会输出日志。而且 OpenFeign 自己的日志级别还有 4 级:

  • NONE:记录任何日志信息,这是默认值。
  • BASIC:记录请求的方法,URL 以及响应状态码和执行时间。
  • HEADERS:ASIC 的基础上,额外记录了请求和响应的头信息。
  • FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。

由于 Feign 默认的日志级别就是 NONE,所以默认我们看不到请求日志。要自定义日志级别需要生明一个类型为 Logger.Level 的 Bean,在其中定义日志级别:

public class DefaultFeignConfig {
    @Bean
    public Logger.Level feignLogLevel() {
        return Logger.Level.FULL;
        // 注意这里的Logger是import feign.Logger;
    }
}

但此时这个 Bean 并未生效,要想配置某个 FeignClient 的日志,可以在 @FeignClient 注解中声明:

@FeignClient(value = "item-service", configuration = DefaultFeignConfig.class)

如果想要全局配置,让所有 FeignClient 都按照这个日志配置来,则需要在 @EnableFeignClients 注解中声明:

@EnableFeignClients(defaultConfiguration = DefaultFeignConfig.class)

网关

单体架构拆分成微服务后,碰到的一个问题就是:每个微服务通过拆分后,访问的端口都是不一样的。这样子,前端在向后端发送请求的时候,也就无法确定要具体往哪个端口发送请求了。并且,登录的时候要求用户携带密钥,如果是微服务的情况下,密钥就需要往各个服务上分发,大大提高了密钥泄漏的风险。

网关:就是网络的关口,负责请求的路由、转发、身份校验

通过网关的路由传递,微服务的各种端口再也不需要暴露给前端了,我们只需要暴露网关的地址即可。并且,对于前端来讲,后端的各种细碎的微服务看起来就和一个单体架构一样,操作上没有任何难度的提升。

在 SpringCloud 中,网关的实现包括两种:

  • Spring Cloud Gateway:基于 WebFlux 响应式编程,Spring 官方出品,无需调优即可获得优异性能。
  • Netfilx Zuul:Netflix 出品,基于 Servlet 的阻塞式编程,需要调优才能获得与 SpringCloudGateway 类似的性能。

网关路由

要使用网关路由,一共有四步:

  1. 创建新模块:Maven 聚合工程下,创建一个新的 gateway 模块。

  2. 引入网关依赖:

    <!--网关-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>
  3. 编写启动类:默认启动类配置即可,无需添加任何注解和参数。

  4. 配置路由规则(端口配置为与单体架构相同的端口——8080,并且也需要配置 nacos 相关的内容):

    spring:
      cloud:
        gateway:
          routes:
            - ids: item	# 路由规则id,自定义,唯一
              uri: lb://item-service	# 路由目标服务,lb表示负载均衡
              predicates: # 路由断言,判断请求是否符合规则,符合则路由到目标
                - Path=/items/**	# 以请求路径判断,以/items开头则符合
            - ids: xx
              uri: lb://xx-service
              predicates:
                - Path=/xx/**,/yy/**

路由属性

网关路由对应的 Java 类型是 RouteDefinition,其中常见的属性有:

  • id:路由唯一标识。
  • uri:路由目标地址。
  • predicates:路由断言,判断请求是否符合当前路由。
  • filters:路由过滤器,对请求或响应做特殊处理。

Spring 提供了 12 中基本的 RoutePredicateFactory 实现:

名称 说明
After 是某个时间点后的请求
Before 是某个时间点前的请求
Between 是某两个时间点之间的请求
Cookie 请求必须包含某些 cookie
Header 请求必须包含某些 header
Host 请求必须是访问某个 host(域名)
Method 请求方式必须是指定方式
Path 请求路径必须符合指定规则
Query 请求参数必须包含指定参数
RemoteAddr 请求者的 ip 必须是指定范围
Weight 权重处理
XForwarded Remote Addr 基于请求的来源 IP 做判断

Spring 提供了 33 种路由过滤器,每种过滤器都有独特的作用,这里不再一一列出。

网关登录校验

我们之前把登录校验的功能写在了用户服务上,现在,我们使用了网关,就必须保证登录校验在网关把请求转发给各个微服务之前完成。

网关请求处理流程如下:

所以,我们应该注册一个过滤器,保证在 NettyRoutingFilter 之前执行,并且是在 pre 阶段进行登录校验的。最后,再把用户登录信息传递给各个微服务。

不过,这个时候我们就无法使用 ThreadLocal 了,因为 ThreadLocal 是对同一个线程而言的,但是微服务是多个服务单独隔离,属于不同线程。这里,我们的解决方案是,把用户的登录信息放入请求头中,让微服务从请求头中拿取信息。

自定义过滤器

网关过滤器有两种,分别是:

  • GatewayFilter:路由过滤器,作用于任意指定的路由;默认不生效,要配置到路由后生效。GatewayFilter 自定义起来比较麻烦,可以直接使用 Spring 官方提供的过滤器,并在配置文件中配置即可。如果要自定义,可以自定义全局过滤器。

  • GlobalFilter:全局过滤器,作用范围是所有路由;声明后自动生效。

    @Component
    public class MyGlobalFilter implements GlobalFilter, Ordered {
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            // 模拟业务逻辑
            HttpHeaders headers = exchange.getRequest().getHeaders();
            System.out.println(headers);
            // 放行
            return chain.filter(exchange);
        }
    
        @Override
        public int getOrder() {
            // 保证顺序是在NettyRoutingFilter之前
            return 0;
        }
    }

实现登录校验

这一小节先展示如何在过滤器中实现登录 token 解析的业务逻辑代码:

@Component
@RequiredArgsConstructor
public class AuthGlobalFilter implements GlobalFilter, Ordered {

    private final AuthProperties authProperties;

    private final JwtTool jwtTool;

    private final AntPathMatcher antPathMatcher = new AntPathMatcher();

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 获取request
        ServerHttpRequest request = exchange.getRequest();

        // 判断是否需要做登录拦截
        if (isExclude(request.getPath().toString())) {
            return  chain.filter(exchange);
        }

        // 需要拦截,获取token
        String token = null;
        List<String> headers = request.getHeaders().get("Authorization");
        if (headers != null && !headers.isEmpty()) {
            token = headers.get(0);
        }

        // 校验并解析token
        Long userId = null;
        try {
            userId = jwtTool.parseToken(token);
        } catch (UnauthorizedException e) {
            // 拦截,设置状态响应码为401
            ServerHttpResponse response = exchange.getResponse();
            response.setStatusCode(HttpStatus.UNAUTHORIZED);
            return response.setComplete();
        }

        // TODO 传递用户信息

        // 放行
        return chain.filter(exchange);
    }

    private boolean isExclude(String path) {
        List<String> excludePaths = authProperties.getExcludePaths();
        for (String excludePath : excludePaths) {
            if (antPathMatcher.match(excludePath, path)) {
                return true;
            }
        }
        return false;
    }

    @Override
    public int getOrder() {
        return 0;
    }
}

传递用户信息

在上一小节中,我们展示了如何在过滤器中使用业务逻辑代码,把携带在请求头中的 token 信息解析出来。token 中的信息是用户的 id 信息,接下来,我们就需要把这个信息传递给网关后方的各个微服务了。

首先,我们需要在网关过滤器中,把用户信息存储到请求头中:

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 获取request
    // 判断是否需要做登录拦截
    // 需要拦截,获取token
    // 校验并解析token
    // ...上述代码略去
    
    // 传递用户信息
    String userInfo = userId.toString();
    ServerWebExchange webExchange = exchange.mutate()
            .request(builder -> builder.header("user-info", userInfo))
            .build();

    // 放行,需要传递新的exchange
    return chain.filter(webExchange);
}

接下来,由于每个微服务都可能有获取登录用户的需求,所以我们需要编写拦截器,把用户信息从请求头中存储到 ThreadLocal 中。但是这样一来,每一个微服务都需要编写一个拦截器,代码复用性很差。因此我们直接在 common 模块定义拦截器,这样微服务只需要引入依赖即可生效,无需重复编写:

public class UserInfoInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request,
                             HttpServletResponse response,
                             Object handler) throws Exception {
        // 获取登录用户信息
        String userInfo = request.getHeader("user-info");

        // 判断是否获取了
        if (StrUtil.isNotBlank(userInfo)) {
            // 存入ThreadLocal
            UserContext.setUser(Long.valueOf(userInfo));
        }

        // 放行,网关层已经做了token验证,这里不管怎样,直接放行即可
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request,
                                HttpServletResponse response,
                                Object handler, Exception ex) throws Exception {
        UserContext.removeUser();
    }
}

最后,还要在 common 模块中利用配置类 MvcConfig 进行注册。但是,别的包在自动装配的时候是没办法扫描到 common 模块的配置类的,所以,还需要在 common 模块下的 /resources/META-INF/spring.factories 文件中添加配置类扫描:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.hmall.common.config.MyBatisConfig,\
  com.hmall.common.config.MvcConfig,\
  com.hmall.common.config.JsonConfig

但是,网关模块不包含 SpringMVC 的任何 API,因为其是 Flux 非阻塞式,我们不希望 MvcConfig 能够在网关模块下生效,所以,需要在 MvcConfig 上添加条件装配注解,让其只在 MVC 模块中生效:

@Configuration
@ConditionalOnClass(DispatcherServlet.class)	// 利用MVC模块核心进行区分
public class MvcConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new UserInfoInterceptor());
    }
}

OpenFeign 传递用户信息

上一小节,我们使用过滤器和拦截器,实现了从网关到微服务的用户信息传递。但是,有些业务场景,需要我们在各个微服务之间进行用户信息的传递(例如微服务远程调用的时候需要把用户信息传过去),那么这个时候又该怎么办呢?

假设现在服务 A 通过 OpenFeign 调用了服务 B 的某个功能,但是服务 B 的这个功能需要获取 ThreadLocal 的信息。因为这个请求是从服务发过来的,所以服务 B 的 ThreadLocal 里面是没有数据的(要想有数据,除非这个请求是从网关发过来的,这样才能走微服务的拦截器,然后把数据存到 ThreadLocal 里面)。

对于此类情况,我们可以采用 OpenFeign 来帮助我们实现这个功能。OpenFeign 中提供了一个拦截器接口,所有由OpenFeign 发起的请求都会先调用拦截器处理请求:

public class DefaultFeignConfig {
    @Bean
    public Logger.Level feignLoggerLevel() {
        return Logger.Level.FULL;
    }

    // 拦截器
    @Bean
    public RequestInterceptor userRequestInterceptor() {
        return new RequestInterceptor() {
            @Override
            public void apply(RequestTemplate template) {
                // 网关转过来的,在把这个请求发送给另一个服务前,TheadLocal依旧是可用的
                Long userId = UserContext.getUser();
                // 把id放入请求头中,保证服务之间的请求也携带user-info
                if (userId != null) {
                    template.header("user-info", userId.toString());
                }
            }
        };
    }
}

注意,需要确保在接收请求的服务上,@EnableFeignClients 注解中 defaultConfiguration 有配置 DefaultFeignConfig.class。这样子才能够让 OpenFeign 配置类生效。

配置管理

对于一个微服务项目,各个微服务都有配置文件,但是随着微服务数量的增多,配置文件也会增多,各种配置项也就增多。维护起来比较繁琐。并且,业务配置会经常变动,每次修改都要重启服务。此外,网关路由配置写死,如果变更,则需要重启网关……这些都会对我们造成困扰。

所以,我们需要配置管理来对可以共享的配置进行配置,来帮助我们降低维护成本。幸运的是,Nacos 组件不仅仅可以做注册中心的服务管理,同时也能帮助我们进行配置管理,实现共享配置和配置热更新等便捷功能。

配置共享

对于 jdbc、mybatis 等可以共用的配置,可以使用配置共享。

在 nacos 左侧的 “配置管理” 中,选择 “配置列表”,然后点击右侧的 “+” 号,便可以实现对共享配置的添加。其中,Data ID 为配置文件的名称。创建好共享的配置后,点击右下角的 “发布” 即可。

在配置文件中,使用 ${层级:默认值} 来对需要灵活替换的配置值进行处理,然后只需要在对应服务的配置文件中,把层级指定出来即可。

接下来,需要让各个微服务去拉取共享配置:

需要引入依赖:

<!--nacos配置管理-->
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--读取bootstrap文件-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>

接着,在需要读取共享配置的服务中,新建 bootstrap.yml,填入相关的配置信息:

spring:
  application:
    name: cart-service # 服务名称
  profiles:
    active: dev
  cloud:
    nacos:
      server-addr: 192.168.23.130:8848 # nacos地址
      config:
        file-extension: yaml # 文件后缀名
        shared-configs: # 共享配置
          - dataId: shared-jdbc.yaml # 共享mybatis配置
          - dataId: shared-log.yaml # 共享日志配置
          - dataId: shared-swagger.yaml # 共享日志配置

原先的 application.yml 需要进行配置的删除和更改:

server:
  port: 8082
feign:
  okhttp:
    enabled: true # 开启OKHttp连接池支持
hm:
  swagger:
    title: 购物车服务接口文档
    package: com.hmall.cart.controller
  db:
    database: hm-cart

配置热更新

配置热更新:当修改配置文件中的配置时,微服务无需重启即可使配置生效

前提条件:

  1. nacos 中要有一个与微服务名有关的配置文件。结构如下:[微服务名称-项目profile(可选).文件后缀名]

  2. 微服务中要以特定方式读取需要热更新的配置属性,示例:

    @Data
    @Component
    @ConfigurationProperties(prefix = "hm.cart")
    public class CartProperties {
        private Integer maxAmount;
    }

接下来,在 nacos 上面创建一个满足 [微服务名称-项目profile(可选).文件后缀名] 格式的配置文件,把配置属性填入其中。然后,在需要使用热更新的微服务中添加一个配置类,利用 @ConfigurationProperties 对配置的属性进行读取,以后的代码中,就使用这个配置类的属性来实现业务逻辑。

动态路由

要实现动态路由首先要将路由配置保存到 Nacos,当 Nacos 中的路由配置变更时,推送最新配置到网关,实时更新网关中的路由信息

实现动态路由前,需要先引入依赖(在 gateway 模块中):

<!--nacos配置管理-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--读取bootstrap文件-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>

接着,把 application.yml 中关于路由的配置删除,我们把路由信息交给 nacos 处理。并且,为了方便解析从 Nacos 读取到的路由配置,推荐使用 json 风格的路由配置,模板如下:

{
  "id": "item",
  "predicates": [{
    "name": "Path",
    "args": {"_genkey_0":"/items/**", "_genkey_1":"/search/**"}
  }],
  "filters": [],
  "uri": "lb://item-service"
}

上述 json 等价于:

spring:
  cloud:
    gateway:
      routes:
        - id: item
          uri: lb://item-service
          predicates:
            - Path=/items/**,/search/**

我们需要完成两件事情:

  1. 监听 Nacos 配置变更的消息。

    @Component
    @Slf4j
    @RequiredArgsConstructor
    public class DynamicRouteLoader {
    
        private final NacosConfigManager nacosConfigManager;
    
        private final String dataId = "gateway-routes.json";
    
        private final String group = "DEFAULT_GROUP";
    
        @PostConstruct  // 在Bean初始化之后执行
        public void initRouteConfigListener() throws NacosException {
            // 1.项目启动,先拉取一次配置,并添加配置监听器
            String configInfo = nacosConfigManager.getConfigService()
                    .getConfigAndSignListener(dataId, group, 5000, new Listener() {
                        @Override
                        public Executor getExecutor() {
                            return null;
                        }
    
                        @Override
                        public void receiveConfigInfo(String configInfo) {
                            // 2.监听到配置变更,需要更新路由表
                            updateConfigInfo(configInfo);
                        }
                    });
    
            // 3.第一次读取到配置,也需要更新到路由表
            updateConfigInfo(configInfo);
        }
    
        public void updateConfigInfo(String configInfo) {
            //...
        }
    
    }
  2. 当配置变更时,将最新的路由信息更新到网关路由表。

    private final RouteDefinitionWriter writer;
    
    private final Set<String> routeIds = new HashSet<>();
    
    // 我们拿到的configInfo实际上就是那个json串
    public void updateConfigInfo(String configInfo) {
        // 解析配置信息,转化为RouteDefinition
        List<RouteDefinition> routeDefinitions = JSONUtil.toList(configInfo, RouteDefinition.class);
    
        // 删除旧的路由表
        for (String routeId : routeIds) {
            writer.delete(Mono.just(routeId)).subscribe();
        }
    
        // 清空旧的路由id表
        routeIds.clear();
    
        // 更新路由表
        routeDefinitions.forEach(routeDefinition -> {
            writer.save(Mono.just(routeDefinition)).subscribe();
            // 记录路由id,便于下一次更新时删除
            routeIds.add(routeDefinition.getId());
        });
    }

以后,只需要在 nacos 左侧的配置列表里,更新 gateway-routes.json 这个配置即可实现动态路由。

微服务保护

雪崩问题

微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,这就是雪崩。(一个服务因为宕机或者其他问题阻塞了,导致前面整个链路的服务都依次阻塞,雪崩)

要想解决雪崩问题,对于服务,可以使用请求限流的方法:限制访问微服务的请求的并发量,避免服务因流量激增出现故障。通过在服务接收请求前加入一个限流器,对并发的请求进行限制,让受保护的服务能够得到一个比较平稳的请求流,减少宕机的可能。

而对于请求,可以采用线程隔离的方法:也叫做舱壁模式,模拟船舱隔板的防水原理。通过限定每个业务能使用的线程数量而将故障业务隔离,避免故障扩散。

对于请求,还需要有服务熔断策略:由断路器统计请求的异常比例或慢调用比例,如果超出阈值则会熔断该业务,则拦截该接口的请求。熔断期间,所有请求快速失败,然后走 fallback 的回调逻辑,进行兜底(例如返回默认数据,或者向前端返回提示)。

服务保护技术:

服务保护技术 Sentinel Hystrix
线程隔离 信号量隔离 线程池隔离 / 信号量隔离
熔断策略 基于慢调用比例或异常比例 基于异常比率
限流 基于 QPS,支持流量整形 有限的支持
Fallback 支持 支持
控制台 开箱即用,可配置规则、查看秒级监控、机器发现等 不完善
配置方式 基于控制台,重启后失效 基于注解或配置文件,永久生效

Sentinel

Sentinel 是阿里巴巴开源的一款微服务流量控制组件。使用 Sentinel 前,需要前往官方的下载地址上下载控制台 jar 包,然后使用如下命令启动:

java -Dserver.port=8090 -Dcsp.sentinel.dashboard.server=localhost:8090 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar

接下来,通过 ip:8090 即可访问控制台,控制台登录的账号和密码默认都是 sentinel。

接下来,就需要让我们的项目能够整合 sentinel,引入依赖:

<!--sentinel-->
<dependency>
    <groupId>com.alibaba.cloud</groupId> 
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

在需要和 sentinel 建立连接的项目配置文件中,添加如下片段:

spring:
  cloud: 
    sentinel:
      transport:
        dashboard: localhost:8090

在 Sentinel 控制台左侧,有一栏 “簇点链路”,簇点链路就是单机调用链路。是一次请求进入服务后经过的每一个被Sentinel 监控的资源链。默认 Sentinel 会监控 SpringMVC 的每一个 Endpoint(http 接口)。限流、熔断等都是针对簇点链路中的资源设置的。而资源名默认就是接口的请求路径。而 Restful 风格的 API 请求路径一般都相同,这回导致簇点资源名称重复。因此我们要修改配置,把 “请求方式 + 请求路径” 作为簇点资源名称:

spring:
  cloud:
    sentinel:
      transport:
        dashboard: localhost:8090
      http-method-specify: true	# 开启请求方式前缀

请求限流

在簇点链路后面点击流控按钮,即可对其做限流配置:直接在 “阈值类型” 中,对 QPS 进行单机阈值配置即可。在被限流拒绝的请求里,会返回 429 状态码,意为该请求被限流拒绝

线程隔离

在 sentinel 控制台中,会出现 Feign 接口的簇点资源,依旧是点击后面的流控按钮,即可配置线程隔离:直接在 “阈值类型” 中,对并发线程数进行单机阈值配置即可。同样因为线程隔离而导致的访问失败,也会返回 429 状态码。

当然,如果线程因为线程隔离而被打回,还是需要做一定的处理的,我们需要添加 Fallback 回调:

  1. 将 FeignClient 作为 Sentinel 的簇点资源(让除了 MVC 之外的请求也可以 Fallback 和隔离,并且主要也是想让远程调用的逻辑具有 Fallback 功能):

    feign:
      sentinel:
        enabled: true
  2. FeignClient 的 Fallback 有两种配置方式:

    1. FallbackClass,无法对远程调用的异常做处理。
    2. FallbackFactory,可以对远程调用的异常做处理,通常会选择这种。

给 FeignClient 编写 Fallback 逻辑,假设现在有这样的一个客户端:

@FeignClient("item-service")
public interface ItemClient {

    @GetMapping("/items")
    List<ItemDTO> queryItemByIds(@RequestParam("ids") Collection<Long> ids);

    @PutMapping("/items/stock/deduct")
    void deductStock(@RequestBody List<OrderDetailDTO> items);

}

为其编写 Fallback 逻辑:

  1. 自定义类,实现 FallbackFactory,编写对某个 FeignClient 的 Fallback 逻辑(在里面会再次创建一个 Client,当原本的 Client 失效异常后,就会走这个具有 Fallback 逻辑的 Client):

    @Slf4j
    public class ItemClientFallbackFactory implements FallbackFactory<ItemClient> {
    
        @Override
        public ItemClient create(Throwable cause) {
            return new ItemClient() {
                @Override
                public List<ItemDTO> queryItemByIds(Collection<Long> ids) {
                    log.error("查询商品失败: " + cause);
                    return CollUtils.emptyList();
                }
    
                @Override
                public void deductStock(List<OrderDetailDTO> items) {
                    log.error("扣减库存失败: " + cause);
                    throw new RuntimeException(cause);
                }
            };
        }
    }
  2. 将刚刚定义的 FallbackFactory 注册为一个 Bean:

    public class DefaultFeignConfig {
        
        //...
        
        @Bean
        public ItemClientFallbackFactory itemClientFallbackFactory() {
            return new ItemClientFallbackFactory();
        }
    }
  3. 在接口中使用 FallbackFactory:

    // 使用fallbackFactory来使用FallbackFatory
    @FeignClient(value = "item-service", fallbackFactory = ItemClientFallbackFactory.class)
    public interface ItemClient {
    
        @GetMapping("/items")
        List<ItemDTO> queryItemByIds(@RequestParam("ids") Collection<Long> ids);
    
        @PutMapping("/items/stock/deduct")
        void deductStock(@RequestBody List<OrderDetailDTO> items);
    
    }

服务熔断

熔断是解决雪崩问题的重要手段。思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。

断路器的工作状态切换由一个状态机来控制:

点击控制台中簇点资源后的熔断按钮,即可配置熔断策略。

分布式事务

对于某些微服务的业务,一些业务操作涉及到多张表的读写。例如,下单业务需要我们先创建订单,去对订单表进行增的操作。之后需要去清理用户购物车,需要对购物车表做删的操作。最后再去扣减商品库存,需要对商品库存表做改的操作。原本在单体架构中,这些操作能够用事务来管理,满足 ACID 属性。但是,现在单体架构被拆分成了一个个的微服务,由于远程调用的存在,事务的 ACID 属性有时候便难以保证了。

在分布式系统中,如果一个业务需要多个服务合作完成,而且每一个服务都有事务,多个事务必须同时成功或失败,这样的事务就是分布式事务。其中的每个服务的事务就是一个分支事务。整个业务称为全局事务

Seata

Seata 是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。

想要解决分布式事务,各个子事务之间必须能感知到彼此的事务状态,才能保证状态一致。在 Seata 中,事务管理有三个重要角色:

  • TC(Transaction Coordinator):事务协调者,维护全局和分支事务的状态,协调全局事务提交或回滚。
  • TM(Transaction Manager):事务管理器,定义全局事务的范围、开始全局事务、提交或回滚全局事务。
  • RM(Resource Manager):资源管理器,管理分支事务,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

部署 TC 服务

部署 TC 服务,需要三步:

  1. 准备数据库表:Seata 支持多种存储模式,但考虑到持久化的需要,我们一般选择基于数据库存储。创建数据库的表所需要的 sql 文件戳这里

  2. 准备配置文件:使用 yml 文件对 Seata 进行配置。

  3. 基于 docker 部署:

    docker run --name seata -p 8099:8099 -p 7099:7099 -e SEATA_IP=192.168.23.130 -v /home/root2/Downloads/seata:/seata-server/resources --privileged=true --network hm-net -d seataio/seata-server:1.5.2

seata 的登录账号密码默认为 admin。

微服务集成 Seata

首先,还是引入依赖:

<!--统一配置管理-->
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--读取bootstrap文件-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<!--seata-->
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

然后,在 nacos 上添加一个共享的 seata 配置,命名为 shared-seata.yaml

seata:
  registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
    type: nacos # 注册中心类型 nacos
    nacos:
      server-addr: 192.168.23.130:8848 # nacos地址
      namespace: "" # namespace,默认为空
      group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
      application: seata-server # seata服务名称
      username: nacos
      password: nacos
  tx-service-group: hmall # 事务组名称
  service:
    vgroup-mapping: # 事务组与tc集群的映射关系
      hmall: "default"

然后,在需要使用分布式事务的微服务上,新增 bootstrap.yml 文件:

spring:
  application:
    name: trade-service # 服务名称
  profiles:
    active: dev
  cloud:
    nacos:
      server-addr: 192.168.23.130 # nacos地址
      config:
        file-extension: yaml # 文件后缀名
        shared-configs: # 共享配置
          - dataId: shared-jdbc.yaml # 共享mybatis配置
          - dataId: shared-log.yaml # 共享日志配置
          - dataId: shared-swagger.yaml # 共享日志配置
          - dataId: shared-seata.yaml # 共享seata配置

然后再把 application.yml 中的一些冗余的共享配置删除:

server:
  port: 8085
feign:
  okhttp:
    enabled: true # 开启OKHttp连接池支持
  sentinel:
    enabled: true # 开启Feign对Sentinel的整合
hm:
  swagger:
    title: 交易服务接口文档
    package: com.hmall.trade.controller
  db:
    database: hm-trade

经过上述的部署和集成后,Seata 服务就已经可以和我们的微服务项目产生关联了。接下来,就可以使用 Seata 来帮助我们解决分布式事务的问题了。

XA 模式

XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范描述了全局的 TM 与局部的 RM 之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持。

Seata 的 XA 模式如下(两阶段提交):

XA 模式的优点:

  • 事务的强一致性,满足ACID原则。
  • 常用数据库都支持,实现简单,并且没有代码侵入。

XA 模式的缺点:

  • 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差。
  • 依赖关系型数据库实现事务。

使用 XA 模式如下:

  1. 在对要使用 XA 事务管理的微服务的 application.yml 文件中开启 XA 模式:

    seata:
      data-source-proxy-mode: XA
  2. 使用 @GlobalTransactional 标记分布式事务的入口:

    @Override
    @GlobalTransactional
    public Long createOrder(OrderFormDTO orderFormDTO) {
        // ...
        // 1.3.查询商品
        List<ItemDTO> items = itemClient.queryItemByIds(itemIds);
        // ...
        // 1.6.将Order写入数据库order表中
        save(order);
    
        // 2.保存订单详情
        List<OrderDetail> details = buildDetails(order.getId(), items, itemNumMap);
        detailService.saveBatch(details);
    
        // 3.清理购物车商品
        cartClient.deleteCartItemByIds(itemIds);
    
        // 4.扣减库存
        try {
            itemClient.deductStock(detailDTOS);
        } catch (Exception e) {
            throw new RuntimeException("库存不足!");
        }
        return order.getId();
    }

AT 模式

Seata 主推的是 AT 模式,AT 模式同样是分阶段提交的事务模型,不过缺弥补了 XA 模型中资源锁定周期过长的缺陷。(空间换时间)

使用 AT 模式如下:

  1. 需要,先在每一个需要分布式事务管理的微服务的数据库中生成一张 undo_log 表,用来存储更新快照。(对于本项目,使用一个 database 来模拟一个独立的数据库,所以每一个 database 都需要一张这个表。生成表的 sql 文件戳我访问

  2. 然后,修改 application.yml 文件,把事务模式更改为 AT 模式:

    seata:
      data-source-proxy-mode: AT	# seata默认就是AT模式,不写也可以
  3. 使用 @GlobalTransactional 注解标记全局事务。

AT 模式与 XA 模式最大的区别是什么?

  • XA 模式一阶段不提交事务,锁定资源;AT 模式一阶段直接提交,不锁定资源。
  • XA 模式依赖数据库机制实现回滚;AT 模式利用数据快照实现数据回滚。
  • XA 模式强一致;AT 模式最终一致(在第一阶段资源直接提交时,会出现短暂的数据不一致)。

RabbitMQ 基础

RabbitMQ 入门

同步 / 异步调用优缺点

同步调用:

  • 优点:时效性强,等待到结果后才返回。大部分情况下的服务还是会采用同步调用。
  • 缺点:拓展性差、性能下降、级联失效。

异步调用:

异步调用通常是基于消息通知的方式,包含三个角色:

  • 消息发送者:投递信息的人,就是原来的调用者。
  • 消息接收者:接收和处理消息的人,就是原来的服务提供者。
  • 消息代理:管理、暂存、转发消息,你可以把它理解成一个消息服务器。

其优缺点如下:

  • 优点:耦合度更低、性能更好、业务拓展性更强。并且有故障隔离,避免了级联失效。
  • 缺点:完全依赖于 Broker 的可靠性、安全性和性能。且架构复杂,后期维护和调式麻烦。

MQ 技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的 Broker。

RabbitMQ ActiveMQ RocketMQ Kafka
公司 / 社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

RabbitMQ 安装部署

我们同样基于 Docker 来安装 RabbitMQ,使用下面命令:

docker run  -e RABBITMQ_DEFAULT_USER=rabbitmq  -e RABBITMQ_DEFAULT_PASS=123456  -v mq-plugins:/plugins  --name mq  --hostname mq  -p 15672:15672  -p 5672:5672  --network hm-net -d  rabbitmq:3.8-management

安装完成后,我们访问 虚拟机ip:15672 即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

RabbitMQ 架构如下:

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方。
  • consumer:消费者,也就是消费消息的一方。
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理。
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的 exchange、queue。

消息发送的注意事项:

  • 交换机只能路由消息,无法存储消息。
  • 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定。

Java 客户端

快速入门

我们在 Spring 中使用的是 Spring AMQP 来操作 RabbitMQ,Spring AMQP 是基于 AMQP 协议定义的一套 API 规范,提供了模板来发送和接收消息。包含两部分,其中 spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。

AMQP(Advanced Message Queuing Protocol),是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP 提供了三个功能:

  • 自动声明队列、交换机及其绑定关系。
  • 基于注解的监听器模式,异步接收消息。
  • 封装了 RabbitTemplate 工具,用于发送消息。

使用 Spring AMQP 时需要导入依赖:

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在控制台中创建一个队列后(名称为 simple.queue),我们尝试使用相对应的 API 来进行消息的发送和接收。首先,需要在每个微服务中引入 MQ 服务端信息(无论是消息的发送方还是接收方都需要配置),这样微服务才能连接到 RabbitMQ:

spring:
  rabbitmq:
    host: 192.168.23.130	# 主机名
    port: 5672
    virtual-host: /hmall	# 虚拟主机
    username: hmall		    # 用户名
    password: 123		    # 密码

消息发送端:

@SpringBootTest
public class SpringAmqpTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {

        // 队列名
        String queueName = "simple.queue";

        // 消息
        String message = "hello, spring amqp";

        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }

}

消息接收端:

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")    // 指定监听的队列
    public void listenSimpleQueue(String message) {
        log.info("监听到{}", message);
    }

}

经过上述操作,就可以完成基于 RabbitMQ 的简单消息发送和接收。

Work Queues

Work Queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。在 Work Queues 中,多个消费者消费消息是负载均衡的,可以有效帮助我们降低单个消费者压力。

在实际开发中,消费者的微服务一般是和其他微服务独立开来的。并且,我们也不会真的去把一份监听的代码重复写好几遍,而是在服务器上部署多个消费者微服务的实例(多实例部署),以次来形成 Work Queues。

不过,在 Work Queues 中,所谓的 “负载均衡” 实际上是把工作量均分。也就是说假设有 50 次请求需要处理,有两个消费者,则每个消费者处理 25 次。这就可能导致一些问题出现:假设现在两个消费者一个处理请求的速度较快,另一个较慢。就会出现处理得快的很快就完成了,然后空闲下来,此时那个处理得慢得还在一直处理,最终用户的等待时间是由那个处理得慢的消费者决定的,这在一定程度上会导致资源的浪费以及时间的浪费。

也就是说,默认情况下,RabbitMQ 的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积
因此我们需要修改 application.yml,设置 preFetch 值为 1,确保同一时刻最多投递给消费者 1 条消息:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次最多获取一条消息,处理完成才能获取下一个消息

这样子,处理得快的消费者就会被分配到更多的请求,“能者多劳”,可以有效缩短响应时间。

Fanout 交换机

上述的实例操作,都是直接把消息发送到队列上,但是实际开发中,我们更多情况下是先把消息发送给交换机,再由交换机把消息路由到各个队列中,再由各个消费者进行消息消费。所以,交换机的作用主要是接收发送者发送的消息,并将消息路由到与其绑定的队列。

常见的交换机类型有三种:

  • Fanout:广播。
  • Direct:定向。
  • Topic:话题。

我们这节先来讨论 Fanout 交换机:

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的 queue,所以也叫广播模式。

首先,在控制台中创建一个 hmall.fanout 交换机,再创建两个队列 fanout.queue1, fanout.queue2,把两个队列绑定到交换机上,然后编写监听器代码:

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String message) {
        log.info("监听到fanout.queue1: {}", message);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String message) {
        log.info("监听到fanout.queue2: {}", message);
    }

}

编写发送者代码:

@Test
public void testFanoutQueue() {

    // 交换机名
    String exchangeName = "hmall.fanout";

    // 消息
    String message = "hello, everyone!";

    // 发送消息,需要使用交换机的重载方法,将RoutingKey指定为null
    rabbitTemplate.convertAndSend(exchangeName, null, message);
}

这样即可完成消息的广播。

Direct 交换机

这节我们来讨论 Direct 交换机。

Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此称为定向路由

  • 每一个 Queue 都与 Exchange 设置一个 BindingKey。
  • 发布者发布消息时,指定消息的 RoutingKey。
  • Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列。
// 发送消息,需要指定RoutingKey
rabbitTemplate.convertAndSend(exchangeName, "yellow", message);

注意,在控制台绑定 RoutingKey 的时候,如果一个队列需要绑定多个 key,需要一个一个 key 去指定,没办法批量指定。

Topic 交换机

这节我们来讨论 Topic 交换机。

Topic Exchange 也是基于 RoutingKey 做消息路由,但是 routingKey 通常是多个单词的组合,并且以 . 分割。

Queue 与 Exchange 指定 BindingKey 时可以使用通配符:

  • #:指代 0 个或者多个单词。

  • *:指代一个单词。

例如,某个队列的 RoutingKey 设置成 hnu.*,那么,就可以匹配 hnu.newshnu.weather 等 RoutingKey。

声明队列交换机

目前,队列和交换机的声明都是在控制台中人工手动声明的。但是我们希望使用代码来对队列和交换机进行声明。

Spring AMQP 提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类 QueueBuilder 构建。
  • Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建。
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建。

一般情况下,声明队列、交互机的相关代码会编写到消费者端:

@Configuration
public class FanoutConfig {

    // 声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        // 可以用new,也可以用build
        // return new FanoutExchange("hmall.fanout");
        return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
    }

    // 声明队列
    @Bean
    public Queue fanoutQueue1() {
        // durable-持久化,意为这个队列会持久化到磁盘上
        return QueueBuilder.durable("fanout.queue1").build();
    }

    // 声明绑定关系
    @Bean
    public Binding fanoutQueue1Binding(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    
    // ...更多队列和绑定关系
}

当然,你也可以通过注解来声明,需要声明在消费者上:

@RabbitListener(bindings = @QueueBinding(   // 队列绑定
        value = @Queue("direct.queue1"),    // 指定队列
        // 指定交换机
        exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {
    log.info("监听到direct.queue1: {}", message);
}

消息转换器

我们在使用消息队列发送消息时,消息不仅仅可以是 String 类型,还可以是别的类型。而 Spring 对消息对象的处理是由 org.springframework.amqp.support.converter.MessageConverter 来处理的。而默认实现是 SimpleMessageConverter,基于 JDk 的 ObjectOutputStream 完成序列化。

存在以下问题:

  • JDK 的序列化有安全风险(容易被代码注入)。
  • JDK 序列化的消息体积太大,占用空间太多。
  • JDK 序列化的消息可读性差。

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了 spring-boot-starter-web 依赖,则无需再次引入 Jackson 依赖。

配置消息转换器,在 publisher 和 consumer 两个服务的启动类中添加 Bean 即可:

@Bean
public MessageConverter messageConverter() {
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

或者配置在 common 模块下:

@Configuration
@ConditionalOnClass(RabbitTemplate.class)	// 记得用条件配置
public class MqConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

记得在 /resources/META-INF/spring.factories 中声明配置:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.hmall.common.config.MyBatisConfig,\
  com.hmall.common.config.MvcConfig,\
  com.hmall.common.config.JsonConfig,\
  com.hmall.common.config.MqConfig

业务改造

使用 RabbitMQ,改造支付业务,当请求支付业务时,同步调用用户服务扣减余额,异步调用其他服务,更新支付状态(更新订单、短信通知、增加积分等等)。

监听器如下:

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    // 注入service
    private final IOrderService orderService;

    // 指定交换机和队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue"),
            exchange = @Exchange(name = "pay.direct"),
            key = "pay.success"
    ))
    public void ListenPaySuccess(Long orderId) {
        // pay发消息后trade模块来处理消息
        orderService.markOrderPaySuccess(orderId);
    }

}

整体业务如下:

private final RabbitTemplate rabbitTemplate;

@Override
@Transactional
public void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {
    
    // ...同步业务略去
    
    
    // 5.修改订单状态,需要try-catch
    // tradeClient.markOrderPaySuccess(po.getBizOrderNo());
    try {
        rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
    } catch (AmqpException e) {
        // 兜底方案见RabbitMQ高级篇
        throw new RuntimeException(e);
    }
}

RabbitMQ 高级

来看一下目前 MQ 存在的消息可靠性问题:在消息队列中,消息的传输是依靠网络的。所以,如果消息发送者在发送消息的时候网络故障,则会导致消息丢失。或者,当 MQ 因为故障而宕机或者消费者宕机,这样也会导致消息丢失。

所以,我们需要从发送者、MQ、消费者的三个角度来探索 MQ 在发送和接收消息时的可靠性问题,提高项目的健壮性。

发送者可靠性

发送者可靠性可以通过两个方案来保证:

  • 发送者重连。
  • 发送者确认。

发送者重连

先来看发送者重连。有的时候由于网络波动,可能会出现发送者连接 MQ 失败的情况。通过配置我们可以开启连接失败后的重连机制(这个机制默认关闭,我们需要手动开启):

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过 Spring AMQP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

发送者确认

一般情况下,只要生产者与 MQ 之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ 内部处理消息的进程发生了异常。
  • 生产者发送消息到达 MQ 后未找到 Exchange。
  • 生产者发送消息到达 MQ 的 Exchange 后,未找到合适的 Queue,因此无法路由。

针对上述情况,RabbitMQ 提供了生产者消息确认机制,包括 Publisher ConfirmPublisher Return 两种。在开启确认机制的情况下,当生产者发送消息给 MQ 后,MQ 会根据消息处理的情况返回不同的回执

总结如下:

  • 当消息投递到 MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时返回 ack 的确认信息,代表投递成功。
  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功。
  • 持久消息投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功。
  • 其它情况都会返回 NACK,告知投递失败。

想要实现发送者确认,需要以下几步:

  1. 开启发送者确认:

    spring:
      rabbitmq:
        publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
        publisher-returns: true # 开启publisher return机制

    这里 publisher-confirm-type 有三种模式可选:

    • none:关闭 confirm 机制。
    • simple:同步阻塞等待 MQ 的回执。
    • correlated:MQ 异步回调返回回执。

    一般我们推荐使用 correlated,回调机制。

  2. 每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目启动过程中配置:

    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    public class MqConfig {
    
        private final RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setReturnsCallback(returnedMessage -> {
                log.error("监听到了消息return callback");
                log.debug("交换机: {}", returnedMessage.getExchange());
                log.debug("RoutingKey: {}", returnedMessage.getRoutingKey());
                log.debug("Message: {}", returnedMessage);
                log.debug("replyCode: {}", returnedMessage.getReplyCode());
                log.debug("replyText: {}", returnedMessage.getReplyText());
            });
        }
    
    }
  3. 发送消息,指定消息 ID、消息 ConfirmCallback:

    @Test
    public void testConfirmCallback() {
        // 创建CorrelationData,保证id唯一性
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    
        // 添加future callback,回执会通过Future来返回
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // Future发生异常时的处理逻辑,基本不会触发
                log.error("send message fail", ex);
            }
    
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 判断消息是否成功被接收
                if (result.isAck()) {
                    log.debug("收到ConfirmCallback ack, 消息发送成功");
                } else {
                    log.error("收到ConfirmCallback nack, 消息发送失败。原因: {}", result.getReason());
                }
            }
        });
    
    
        // 交换机名
        String exchange = "hmall.direct";
    
        // 消息
        String message = "hello";
    
        // 发送消息
        rabbitTemplate.convertAndSend(exchange, "blue", message, cd);
    }

注意

开启生产者确认比较消耗 MQ 性能,一般不建议开启。而且考虑触发确认的几种情况:

  • 路由失败:一般是因为 RoutingKey 错误导致,往往是编程导致。
  • 交换机名称错误:同样是编程错误导致。
  • MQ 内部故障:这种需要处理,但概率往往极低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启 ConfirmCallback 处理 nack 就可以了。

MQ 可靠性

消息到达 MQ 以后,如果 MQ 不能及时保存,也会导致消息丢失,所以 MQ 的可靠性也非常重要。在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦 MQ 宕机,内存中的消息就会丢失。
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发 MQ 阻塞。

MQ 的可靠性有两个方案可以保证:

  • 数据持久化。
  • Lazy Queue。

数据持久化

默认情况下,队列、交换机等的持久化是开启的,无需我们更改。但是,消息的持久化需要我们在控制台中手动指定。在 “Queues” 中的 “Publish message” 里,把 “Delivery mode” 改为 “Persistent” 即可。

不过,Spring 默认发出去的消息就是持久化的。出于性能考虑,为了减少 IO 次数,发送到 MQ 的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在 100 毫秒左右,这就会导致 ACK 有一定的延迟,因此建议生产者确认全部采用异步方式。

Lazy Queue

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障。
  • 消息发送量激增,超过了消费者处理速度。
  • 消费者处理业务发生阻塞。

一旦出现消息堆积问题,RabbitMQ 的内存占用就会越来越高,直到触发内存预警上限。此时 RabbitMQ 会将内存消息刷到磁盘上,这个行为成为 PageOut. PageOut 会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ 不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存。
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)。
  • 支持数百万条的消息存储。

而在 3.12 版本之后,Lazy Queue 已经成为所有队列的默认格式。因此官方推荐升级 MQ 为 3.12 版本或者所有队列都设置为 Lazy Queue 模式。

如果版本较低,需要手动指定 Lazy Queue,只需要在声明队列的时候,指定 “Arguments” x-queue-mode=lazy 即可。

或者用代码添加:

@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.queue").lazy().build();
}

// 或者
@RabbitListener(queuesToDeclare = @Queue(
		name = "lazy.queue",
    	durable = "true",
    	arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
    // ...
}

消费者可靠性

当 RabbitMQ 向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障。
  • 消费者接收到消息后突然宕机。
  • 消费者接收到消息后,因处理不当导致异常。

一旦发生上述情况,消息也会丢失。因此,RabbitMQ 必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

但问题来了:RabbitMQ 如何得知消费者的处理状态呢?本章我们就一起研究一下消费者处理消息时的可靠性解决方案。

消费者确认机制

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后
应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态:

  • ack:成功处理消息,RabbitMQ 从队列中删除消息。
  • nack:消息处理失败,RabbitMQ 需要再次投递消息。
  • reject:消息处理失败并拒绝 RabbitMQ 的再次投递。

一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过 try catch 机制捕获,消息处理成功时返回 ack,处理失败时返回 nack。

由于消息回执的处理代码比较统一,因此 Spring AMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用 api,发送 ackreject,存在业务入侵,但更灵活。
  • auto:自动模式。Spring AMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto

消费者失败重试机制

Spring AMQP 提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的 requeue 到 mq。我们可以通过在 application.yaml 文件中添加配置来开启重试机制:

spring:
  rabbitmq:
    listener:
      simple:
      	prefetch: 1
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息,默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

那么,如何更改重试策略呢?首先我们需要定义相关的交换机、队列和绑定关系(专门用来处理 error 消息的)。然后,定义 RepublishMessageRecoverer:

@Configuration
@RequiredArgsConstructor
public class ErrorMessageConfig {

    private final RabbitTemplate rabbitTemplate;

    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorQueueBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    // 失败处理策略
    public MessageRecoverer messageRecoverer() {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }

}

业务幂等处理

何为幂等性?

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据 id 删除数据。
  • 查询数据。
  • 新增数据。

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况。
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。

然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交。
  • 服务间调用的重试。
  • MQ 消息的重复投递。

我们在用户支付成功后会发送 MQ 消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。

举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID。每一条消息都生成一个唯一的 id,与消息一起投递给消费者。消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库。如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

    @Bean
    public MessageConverter messageConverter() {
        // 定义消息转换器
        Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    
        // 配置自动创建消息id,用于识别不同消息
        jjmc.setCreateMessageIds(true);
    
        return jjmc;
    }

    接收的时候需要用 Message 接:

    @RabbitListener(queues = "simple.queue")    // 指定监听的队列
    public void listenSimpleQueue(Message message) {
        log.info("监听到{}", new String(message.getBody()));
        log.info("消息id为: {}", message.getMessageProperties().getMessageId());
    }

    不过,这种方案因为需要写代码把 id 保存到数据库并且还要查询等,属于业务侵入。并且,涉及到更多的数据库操作,使得业务性能也一般,因此我们不太推荐这种做法。

  • 业务状态判断。业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

    例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

    @Component
    @RequiredArgsConstructor
    public class PayStatusListener {
    
        private final IOrderService orderService;
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "trade.pay.success.queue"),
                exchange = @Exchange(name = "pay.direct"),
                key = "pay.success"
        ))
        public void ListenPaySuccess(Long orderId) {
            // 查询订单
            Order order = orderService.getById(orderId);
    
            // 判断订单状态是否为未支付
            if (order == null || order.getStatus() != 1) {
                // 不做处理
                return;
            }
    
            // pay发消息后trade模块来处理消息
            orderService.markOrderPaySuccess(orderId);
        }
    
    }

延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为 30 分钟,则我们应该在用户下单后的第 30 分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第 30 分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用 MQ 的延迟消息了。

  • 延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

  • 延迟任务:设置在一定时间之后才执行的任务。

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false。
  • 消息是一个过期消息,超时无人消费。
  • 要投递的队列消息满了,无法投递。

如果一个队列中的消息已经成为死信,并且这个队列通过 dead-letter-exchange 属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息。
  2. 收集那些因队列满了而被拒绝的消息。
  3. 收集因 TTL(有效期)到期的消息。

利用死信交换机可以实现消息延迟的效果:首先,publisher 设置一个 TTL,把消息投递给某个普通交换机,接着存储到某个队列中,但是,不要让任何消费者去消费这个普通队列中的信息。这样,当过了 TTL,普通队列中的消息就会被投递到其绑定的私信交换机中(两个 RoutingKey 要一致),由死信交换机对应的消费者处理掉消息。这样,就实现了消息延迟处理。

// 死信交换机消费者
@RabbitListener(bindings = @QueueBinding(   
        value = @Queue("dlx.queue"),   
        exchange = @Exchange(name = "dlx.direct"),
        key = {"hi"}
))
public void listenDlxQueue(String message) {
    log.info("监听到dlx.queue: {}", message);
}

使用默认的配置方式代替注解方式生成普通队列和交换机,防止普通队列和交换机出现消费者:

@Configuration
public class NormalConfig {

    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal.direct");
    }

    // 绑定死信交换机
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();
    }

    @Bean
    public Binding normalExchangeBinding(Queue directQueue, DirectExchange normalExchange) {
        return BindingBuilder.bind(directQueue).to(normalExchange).with("hi");
    }

}

设置消息过期时间:

@Test
public void testSendDelayData() {
    // 发送延迟消息
    rabbitTemplate.convertAndSend("normal.direct", "hi",
            "hello", message -> {
                // 设置消息过期时间
                message.getMessageProperties().setExpiration("10000");
                return message;
            });
}

延迟消息插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此 RabbitMQ 社区提供了一个延迟消息插件 DelayExchange 来实现相同的效果。

如果是 Docker 部署,需要先查看 RabbitMQ 的插件目录对应的数据卷:

docker volume inspect mq-plugins

找到 “Mountpoint” 所描述的目录,把插件文件上传上去即可。接下来,执行命令,启动插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动好后,就可以开始实现延迟消息了。

声明延迟交换机:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}
@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送消息时,必须通过 x-delay 属性设定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息

Elasticsearch 基础

部署与安装

由于数据库模糊查询不走索引,在数据量较大的时候,查询性能很差。数据库模糊查询随着表数据量的增多,查询性能的下降会非常明显,而搜索引擎的性能则不会随着数据增多而下降太多。目前仅 10 万不到的数据量差距就如此明显,如果数据量达到百万、千万、甚至上亿级别,这个性能差距会非常夸张。

Elasticsearch 是由 elastic 公司开发的一套搜索引擎技术,它是 elastic 技术栈中的一部分。完整的技术栈包括:

  • Elasticsearch:用于数据存储、计算和搜索。
  • Logstash / Beats:用于数据收集。
  • Kibana:用于数据可视化。

整套技术栈被称为 ELK,经常用来做日志收集、系统监控和状态分析等等。

Elasticsearch 安装(这里我们采用的是 elasticsearch 的 7.12.1 版本,由于 8 以上版本的 Java API 变化很大,在企业中应用并不广泛,企业中应用较多的还是 8 以下的版本。):

docker run -d \
  --name es \
  -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
  -e "discovery.type=single-node" \
  -v es-data:/usr/share/elasticsearch/data \
  -v es-plugins:/usr/share/elasticsearch/plugins \
  --privileged \
  --network hm-net \
  -p 9200:9200 \
  -p 9300:9300 \
  elasticsearch:7.12.1

安装完成后,访问 虚拟机ip:9200,即可看到响应的 Elasticsearch 服务的基本信息。

通过下面的 Docker 命令,即可部署 Kibana(图形界面工具,帮助我们更好地操作 Elasticsearch):

docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es:9200 \
--network=hm-net \
-p 5601:5601  \
kibana:7.12.1

安装完成后,直接访问 虚拟机ip:5601,即可看到控制台页面,选中 Dev tools,进入开发工具页面。

倒排索引

正向索引

倒排索引的概念是基于 MySQL 这样的正向索引而言的。传统数据库(如 MySQL)采用的是正向索引,例如给下表 tb_goods

id title price
1 小米手机 3499
2 华为手机 4999
3 华为小米充电器 49
4 小米手环 49

其中的 id 字段已经创建了索引,由于索引底层采用了 B+ 树结构,因此我们根据 id 搜索的速度会非常快。但是其他字段例如 title,只在叶子节点上存在。因此要根据 title 搜索的时候只能遍历树中的每一个叶子节点,判断 title 数据是否符合要求。

综上,根据 id 精确匹配时,可以走索引,查询效率较高。而当搜索条件为模糊匹配时,由于索引无法生效,导致从索引查询退化为全表扫描,效率很差。

因此,正向索引适合于根据索引字段的精确搜索,不适合基于部分词条的模糊匹配。而倒排索引恰好解决的就是根据部分词条模糊匹配的问题

倒排索引

倒排索引中有两个非常重要的概念:

  • 文档(Document):用来搜索的数据,其中的每一条数据就是一个文档。例如一个网页、一个商品信息。
  • 词条(Term):对文档数据或用户搜索数据,利用某种算法分词,得到的具备含义的词语就是词条。例如:我是中国人,就可以分为:我、是、中国人、中国、国人这样的几个词条。

创建倒排索引是对正向索引的一种特殊处理和应用,流程如下:

  • 将每一个文档的数据利用分词算法根据语义拆分,得到一个个词条。
  • 创建表,每行数据包括词条、词条所在文档 id、位置等信息。
  • 因为词条唯一性,可以给词条创建正向索引。

此时形成的这张以词条为索引的表,就是倒排索引表,两者对比如下:

正向索引

id(索引) title price
1 小米手机 3499
2 华为手机 4999
3 华为小米充电器 49
4 小米手环 49

倒排索引

词条(索引) 文档id
小米 1,3,4
手机 1,2
华为 2,3
充电器 3
手环 4

查询的时候是先查倒排索引,再查正向索引。虽然要先查询倒排索引,再查询正向索引,但是无论是词条、还是文档 id 都建立了索引,查询速度非常快!无需全表扫描。

IK 分词器

中文分词往往需要根据语义分析,比较复杂,这就需要用到中文分词器,例如 IK 分词器。IK 分词器是林良益在2006 年开源发布的,其采用的正向迭代最细粒度切分算法一直沿用至今。

首先查看 es-plugins 挂载的地方:

docker volume inspect es-plugins

“Mountpoint” 后方的目录即为数据卷的目录地址。接下来,把插件复制到数据卷目录里然后重启 es 即可:

docker restart es

IK分词器包含两种模式:

  • ik_smart:智能语义切分
  • ik_max_word:最细粒度切分

我们在 Kibana 的 DevTools 上来测试分词器,首先测试 Elasticsearch 官方提供的标准分词器:

POST /_analyze
{
  "analyzer": "standard",
  "text": "学习java太棒了"
}

结果是:标准分词器智能 1 字 1 词条,无法正确对中文做分词。我们再测试 IK 分词器:

POST /_analyze
{
  "analyzer": "ik_smart",
  "text": "学习java太棒了"
}
POST /_analyze
{
  "analyzer": "ik_max_word",
  "text": "学习java太棒了"
}

测试后发现,IK 分词器可以成功分词。

注意:

  • ik_smart 是智能分词,尽量选择最像一个词的拆分方式。
  • ik_max_word 是尽可能地分词,可以包括组合词。

使用的时候:

  • ik_smart:用于搜索分词,即在查询时使用,保证性能的同时提供合理的分词精度。
  • ik_max_word:适用于底层索引分词,确保在建立索引时尽可能多地分词,提高查询时的匹配度和覆盖面。

不过,随着互联网的发展,“造词运动” 也越发的频繁。出现了很多新的词语,在原有的词汇列表中并不存在。IK 分词器无法对这些词汇分词。要想正确分词,IK 分词器的词库也需要不断的更新,IK 分词器提供了扩展词汇的功能:

  1. 打开 IK 分词器 config 目录(/es-plugins/_data/ik/cofig/IKAnalyzer.cfg.xml)。

  2. IKAnalyzer.cfg.xml 中添加如下配置:

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
    <properties>
            <comment>IK Analyzer 扩展配置</comment>
            <!--用户可以在这里配置自己的扩展字典 *** 添加扩展词典-->
            <entry key="ext_dict">ext.dic</entry>
    </properties>
  3. 在 IK 分词器的 config 目录新建一个 ext.dic,可以参考 config 目录下复制一个配置文件进行修改:

    新增词汇1
    新增词汇2
  4. 最后重启 elasticsearch 即可。

基础概念

数据库和 Elasticsearch 的概念对比如下:

MySQL Elasticsearch 说明
Table Index 索引,就是文档的集合,类似于数据库的表。
Row Document 文档,就是一条条的数据,类似数据库中的行,文档都是 JSON 格式。
Column Field 字段,就是 JSON 文档中的字段,类似于数据库中的列。
Schema Mapping 映射,就是索引中文档的约束,例如字段类型约束。类似于数据库的表结构。
SQL DSL DSL 是 Elasticsearch 提供的 JSON 风格的请求语句,用来定义搜索条件。

索引库操作

Mapping 映射属性

索引类似于数据库的表概念,创建表需要指定表的约束,自然,创建索引也需要指定索引的映射。Mapping 是对索引库中文档的约束,常见的 mapping 属性包括:

  • type:字段数据类型,常见的简单类型有:
    • 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip 地址)
    • 数值:longintegershortbytedoublefloat。(如果是数字数组,则指定元素类型即可,无需指定成数组类型)
    • 布尔:boolean
    • 日期:date
    • 对象:object
  • index:是否创建索引,默认为true
  • analyzer:使用哪种分词器。
  • properties:该字段的子字段。

索引库操作

由于 Elasticsearch 采用的是 Restful 风格的 API,因此其请求方式和路径相对都比较规范,而且请求参数也都采用JSON 风格。我们直接基于 Kibana 的 DevTools 来编写请求做测试,由于有语法提示,会非常方便。

创建索引库

基本语法:

  • 请求方式:PUT
  • 请求路径:/索引库名,可以自定义。
  • 请求参数:mapping 映射。

格式:

PUT /索引库名称
{
  "mappings": {
    "properties": {
      "字段名":{
        "type": "text",
        "analyzer": "ik_smart"
      },
      "字段名2":{
        "type": "keyword",
        "index": "false"	// 只有当index为false的时候才需要指定,默认为true
      },
      "字段名3":{
        "type": "object",
        "properties": {
          "子字段": {
            "type": "keyword"
          }
        }
      },
      // ...略
    }
  }
}
查询索引库

基本语法:

  • 请求方式:GET
  • 请求路径:/索引库名
  • 请求参数:无。

格式:

GET /索引库名
修改索引库

倒排索引结构虽然不复杂,但是一旦数据结构改变(比如改变了分词器),就需要重新创建倒排索引,这简直是灾难。因此索引库一旦创建,无法修改 mapping

虽然无法修改 mapping 中已有的字段,但是却允许添加新的字段到 mapping 中,因为不会对倒排索引产生影响。因此修改索引库能做的就是向索引库中添加新字段,或者更新索引库的基础属性。

格式:

PUT /索引库名/_mapping
{
  "properties": {
    "新字段名":{
      "type": "integer"
    }
  }
}
删除索引库

基本语法:

  • 请求方式:DELETE
  • 请求路径:/索引库名
  • 请求参数:无。

格式:

DELETE /索引库名

文档操作

文档 CRUD

文档的操作同样遵循 RestfulAPI 风格。

新增文档

格式如下:

POST /索引库名/_doc/文档id
{
    "字段1": "值1",
    "字段2": "值2",
    "字段3": {
        "子属性1": "值3",
        "子属性2": "值4"
    },
}
查询文档

格式如下:

GET /{索引库名称}/_doc/{id}
删除文档

格式如下:

DELETE /{索引库名}/_doc/id值
修改文档

修改有两种方式:

  • 全量修改:直接覆盖原来的文档,先把旧文档删掉(如果没有相对应的旧文档,这一步不会执行),然后添加新文档。

    PUT /{索引库名}/_doc/文档id
    {
        "字段1": "值1",
        "字段2": "值2",
        // ... 略
    }
  • 局部修改:修改文档中的部分字段。

    POST /{索引库名}/_update/文档id
    {
        "doc": {
             "字段名": "新的值",
        }
    }

批量处理

批处理采用 POST 请求,基本语法如下:

POST _bulk
{ "index" : { "_index" : "索引库名", "_id" : "1" } }	
{ "field1" : "value1" }								 
{ "delete" : { "_index" : "索引库名", "_id" : "2" } } 
{ "create" : { "_index" : "索引库名", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "索引库名"} }
{ "doc" : {"field2" : "value2"} }

其中:

  • index 代表新增操作:
    • _index:指定索引库名。
    • _id:指定要操作的文档 id。
    • { "field1" : "value1" }:则是要新增的文档内容。
  • delete 代表删除操作:
    • _index:指定索引库名。
    • _id:指定要操作的文档 id。
  • update 代表更新操作:
    • _index:指定索引库名。
    • _id:指定要操作的文档 id。
    • { "doc" : {"field2" : "value2"} }:要更新的文档字段。

Elasticsearch 高级

JavaRestClient

ES 官方提供了各种不同语言的客户端,用来操作 ES。这些客户端的本质就是组装 DSL 语句,通过 http 请求发送给 ES。(官方文档戳我访问

由于 ES 目前最新版本是 8.x,提供了全新版本的客户端,老版本的客户端已经被标记为过时。而我们采用的是 7.12版本,因此只能使用老版本客户端。

使用之前需要引入依赖:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

因为 SpringBoot 默认的 ES 版本是 7.17.10,所以我们需要覆盖默认的 ES 版本:

<properties>
  <maven.compiler.source>11</maven.compiler.source>
  <maven.compiler.target>11</maven.compiler.target>
  <elasticsearch.version>7.12.1</elasticsearch.version>
</properties>

初始化客户端:

public class ElasticTest {

    private RestHighLevelClient client;

    @Test
    void testConnection() {
        // 测试连接是否成功
        System.out.println(client);
    }

    @BeforeEach
    void setUp() {
        client = new RestHighLevelClient(RestClient.builder(
                HttpHost.create("http://192.168.23.130:9200")
        ));
    }

    @AfterEach
    void tearDown() throws IOException {
        if (client != null) {
            client.close();
        }
    }

}

索引库操作

// 新增索引库
@Test
void testCreateIndex() throws IOException {
    // 准备Request对象
    CreateIndexRequest request = new CreateIndexRequest("items");   // items为索引库的名称

    // 准备请求参数,MAPPING_TEMPLATE是创建索引库的json语法(去除PUT行)
    request.source(MAPPING_TEMPLATE, XContentType.JSON);

    // 发送请求
    client.indices()    // indices也是一个客户端,具有CRUD的各种操作
            .create(request, RequestOptions.DEFAULT);
}

// 查询索引库
@Test
void getIndex() throws IOException {
    GetIndexRequest request = new GetIndexRequest("items"); // 注意这里是Get

    GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);// 调exists方法也可以

    System.out.println(response);
}

// 删除索引库
@Test
void testDeleteIndex() throws IOException {
    DeleteIndexRequest request = new DeleteIndexRequest("items"); // 注意这里是Delete

    client.indices().delete(request, RequestOptions.DEFAULT);
}

文档操作

新增文档如下:

@Test
void testIndexDoc() throws IOException {
    // 准备Request对象
    IndexRequest request = new IndexRequest("items").id("1");

    // 准备请求参数
    request.source("{datajson}", XContentType.JSON);

    // 发送请求
    client.index(request, RequestOptions.DEFAULT);
}

删除文档如下:

@Test
void testDeleteDocument() throws IOException {
    // 准备Request,两个参数,第一个是索引库名,第二个是文档id
    DeleteRequest request = new DeleteRequest("item", "100002644680");
    // 发送请求
    client.delete(request, RequestOptions.DEFAULT);
}

查询文档如下,需要额外处理数据解析过程:

@Test
void testGetDoc() throws IOException {
    // 使用request对象发送请求
    GetRequest request = new GetRequest("items", "317578");

    GetResponse response = client.get(request, RequestOptions.DEFAULT);

    // 解析结果
    String json = response.getSourceAsString();
    ItemDoc itemDoc = JSONUtil.toBean(json, ItemDoc.class);

    System.out.println(itemDoc);
}

对于修改文档,之前说果,修改文档数据有两种方式:全量更新和局部更新。其中,对于全量更新而言,其 API 与新增文档的 API 一致,这里我们不作演示。我们演示局部更新的 API:

@Test
void testUpdateDocument() throws IOException {
    // 准备Request
    UpdateRequest request = new UpdateRequest("items", "100002644680");
    
    // 准备请求参数
    request.doc(
            "price", 58800,
            "commentCount", 1
    );
    
    // 发送请求
    client.update(request, RequestOptions.DEFAULT);
}

文档批处理

JavaRestClient 提供了批量处理专用的 API——BulkRequest,可以实现批量新增操作:

@Test
void testBulk() throws IOException {
    // 创建Request
    BulkRequest request = new BulkRequest();
    
    // 准备请求参数
    request.add(new IndexRequest("items").id("1").source("json doc1", XContentType.JSON));
    request.add(new IndexRequest("items").id("2").source("json doc2", XContentType.JSON));
    
    // 发送请求
    client.bulk(request, RequestOptions.DEFAULT);
}

当我们要导入业务数据时,由于数据量达到数十万,因此不可能一次性全部导入。建议采用循环遍历方式,每次导入 1000 条左右的数据:

@Test
void testBulkDoc() throws IOException {

    int pageNo = 1, pageSize = 1000;

    while (true) {
        // 准备文档数据,使用分页查询
        Page<Item> page = itemService.lambdaQuery()
                .eq(Item::getStatus, 1)
                .page(Page.of(pageNo, pageSize));

        List<Item> records = page.getRecords();

        if (records == null || records.isEmpty()) {
            return;
        }

        // 准备Request
        BulkRequest bulkRequest = new BulkRequest();
        for (Item item : records) {
            ItemDoc itemDoc = BeanUtil.copyProperties(item, ItemDoc.class);
            bulkRequest.add(new IndexRequest("items")
                    .id(itemDoc.getId())
                    .source(JSONUtil.toJsonStr(itemDoc), XContentType.JSON));
        }

        // 分页递增
        ++pageNo;

    }

}

DSL 查询

Elasticsearch 提供了 DSL (Domain Specific Language)查询,就是以 JSON 格式来定义查询条件。

Elasticsearch 的查询可以分为两大类:

  • 叶子查询(Leaf query clauses):一般是在特定的字段里查询特定值,属于简单查询,很少单独使用。
  • 复合查询(Compound query clauses):以逻辑方式组合多个叶子查询或者更改叶子查询的行为方式。

在查询以后,还可以对查询的结果做处理,包括:排序、分页、高亮、聚合等。

快速入门

基于 DSL 的查询语法如下:

GET /{索引库名}/_search
{
  "query": {
    "查询类型": {
      "查询条件": "条件值"
    }
  }
}

// 示例,查所有:
GET /indexName/_search
{
  "query": {
    "match_all": {}
  }
}

叶子查询

叶子查询可以进一步细分,这里列举一些常见的,例如:

  • 全文检索查询(Full Text Queries):利用分词器对用户输入搜索条件先分词,得到词条,然后再利用倒排索引搜索词条。例如:

    • match

      GET /{索引库名}/_search
      {
        "query": {
          "match": {
            "字段名": "搜索条件"
          }
        }
      }
    • multi_match

      GET /{索引库名}/_search
      {
        "query": {
          "multi_match": {
            "query": "搜索条件",
            "fields": ["字段1", "字段2"]
          }
        }
      }
  • 精确查询(Term-level queries):不对用户输入搜索条件分词,根据字段内容精确值匹配。但只能查找 keyword、数值、日期、boolean 类型的字段。例如:

    • ids

    • term

      GET /{索引库名}/_search
      {
        "query": {
          "term": {
            "字段名": {
              "value": "搜索条件"
            }
          }
        }
      }
    • range

      GET /{索引库名}/_search
      {
        "query": {
          "range": {
            "字段名": {
              "gte": {最小值},
              "lte": {最大值}
            }
          }
        }
      }
  • 地理坐标查询:用于搜索地理位置,搜索方式很多,例如:

    • geo_bounding_box:按矩形搜索。
    • geo_distance:按点和半径搜索。

复合查询

复合查询大致可以分为两类:

  • 第一类:基于逻辑运算组合叶子查询,实现组合条件,例如:
    • bool
  • 第二类:基于某种算法修改查询时的文档相关性算分,从而改变文档排名(有时候并不希望和用户搜索匹配相关度高的在前,而是谁掏的钱多谁在前)。例如:
    • function_score
    • dis_max

我们这里主要看 bool 查询:bool 查询,即布尔查询。就是利用逻辑运算来组合一个或多个查询子句的组合。bool 查询支持的逻辑运算有:

  • must:必须匹配每个子查询,类似 “与”。
  • should:选择性匹配子查询,类似 “或”。
  • must_not:必须不匹配,不参与算分,类似 “非”。
  • filter:必须匹配,不参与算分

bool 查询的语法示例如下:

GET /items/_search
{
  "query": {
    "bool": {
      "must": [
        {"match": {"name": "手机"}}
      ],
      "should": [
        {"term": {"brand": { "value": "vivo" }}},
        {"term": {"brand": { "value": "小米" }}}
      ],
      "must_not": [
        {"range": {"price": {"gte": 2500}}}
      ],
      "filter": [
        {"range": {"price": {"lte": 1000}}}
      ]
    }
  }
}

出于性能考虑,与搜索关键字无关的查询尽量采用 must_notfilter 逻辑运算,避免参与相关性算分。

排序

elasticsearch 默认是根据相关度算分(_score)来排序,但是也支持自定义方式对搜索结果排序。不过分词字段无法排序,能参与排序字段类型有:keyword 类型、数值类型、地理坐标类型、日期类型等。

语法说明:

GET /indexName/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "排序字段": {
        "order": "排序方式asc和desc"
      }
    }
  ]
}

// 简化方式
GET /indexName/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "字段1": "asc"
    },
    {
      "字段2": "desc"
    }
  ]
}

分页

elasticsearch 默认情况下只返回 top10 的数据。而如果要查询更多数据就需要修改分页参数了。elasticsearch 通过修改 from、size 参数来控制要返回的分页结果:

  • from:从第几个文档开始。
  • size:总共查询几个文档。
GET /indexName/_search
{
  "query": {
    "match_all": {}
  },
  "from": 0, // 分页开始的位置,默认为0
  "size": 10,  // 每页文档数量,默认10
  "sort": [
    {
      "price": {
        "order": "desc"
      }
    }
  ]
}

elasticsearch 的数据一般会采用分片存储,也就是把一个索引中的数据分成 N 份,存储到不同节点上。这种存储方式比较有利于数据扩展,但给分页带来了一些麻烦。

实际上,很好理解,比如:在一所学校里,对学生进行了班级划分(比如 30 个学生为一个班级,假设我们划分了 10 个班级),这就类似于对数据的分片存储。假设现在我们对这所学校的学生进行 “分页查询” —— 查找出成绩在前 10 名的学生。此时,你没办法一次性直接查出,因为这前 10 名学生分散到了各个班级里,并且不能保证每个班级的第一名一定就落在整个年级的前十名里。你只能把每个班级前 10 名的学生按成绩排序,最后取前 10 名。

由此可知,当查询分页深度较大时,汇总数据过多,对内存和 CPU 会产生非常大的压力。

因此 elasticsearch 会禁止 from+size 超过 10000 的请求。

针对深度分页,elasticsearch 提供了两种解决方案:

  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。优点:没有查询上限,支持深度分页;缺点:只能向后逐页查询,不能随机翻页。使用场景:数据迁移、手机滚动查询。
  • scroll:原理将排序后的文档 id 形成快照,保存下来,基于快照做分页。官方已经不推荐使用。

大多数情况下,我们采用普通分页就可以了。查看百度、京东等网站,会发现其分页都有限制。例如百度最多支持 77 页,每页不足 20 条。京东最多 100 页,每页最多 60 条。

因此,一般我们采用限制分页深度的方式即可,无需实现深度分页。

高亮显示

什么是高亮显示呢?我们在百度,京东搜索时,关键字会变成红色,比较醒目,这叫高亮显示。观察页面源码,我们发现两件事情:

  • 高亮词条都被加了 <em> 标签。
  • <em> 标签都添加了红色样式。

css 样式肯定是前端实现页面的时候写好的,但是前端编写页面的时候是不知道页面要展示什么数据的,不可能给数据加标签。而服务端实现搜索功能,要是有 elasticsearch 做分词搜索,是知道哪些词条需要高亮的。

因此词条的高亮标签肯定是由服务端提供数据的时候已经加上的

因此实现高亮的思路就是:

  • 用户输入搜索关键字搜索数据。
  • 服务端根据搜索关键字到 elasticsearch 搜索,并给搜索结果中的关键字词条添加 html 标签。
  • 前端提前给约定好的 html 标签添加 CSS 样式。

事实上 elasticsearch 已经提供了给搜索关键字加标签的语法,无需我们自己编码。

基本语法如下:

GET /{索引库名}/_search
{
  "query": {
    "match": {
      "搜索字段": "搜索关键字"
    }
  },
  "highlight": {
    "fields": {
      "高亮字段名称": {
        // 没有特殊要求不需要指定pre_tags和post_tags,默认就是<em></em>
        "pre_tags": "<em>",
        "post_tags": "</em>"
      }
    }
  }
}

注意

  • 搜索必须有查询条件,而且是全文检索类型的查询条件,例如 match
  • 参与高亮的字段必须是 text 类型的字段。
  • 默认情况下参与高亮的字段要与搜索字段一致,除非添加:required_field_match=false

JavaRestClient 查询

快速入门

数据搜索的 Java 代码分为两部分:

  • 构建并发起请求。

    @Test
    void testMatchAll() throws IOException {
        // 创建request对象
        SearchRequest request = new SearchRequest("items");
    
        // 配置request参数
        /*
                GET /items/_search
                {
                   "query": {
                     "match_all": {}
                   }
                }
         */
        request.source()
                .query(QueryBuilders.matchAllQuery());
    
        // 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    
        // ...
    }
  • 解析查询结果。

    @Test
    void testMatchAll() throws IOException {
        // ...
    
        // 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    
        // 解析结果
        SearchHits searchHits = response.getHits();
    
        // 总条数
        long total = searchHits.getTotalHits().value;
        // 命中的数据
        SearchHit[] hits = searchHits.getHits();
        for (SearchHit hit : hits) {
            // 解析source结果
            String json = hit.getSourceAsString();
            // 转换为对应对象
            ItemDoc doc = JSONUtil.toBean(json, ItemDoc.class);
            // ...
        }
    }

构建查询条件

所有的查询条件都是由 QueryBuilders 来构建的,叶子查询也不例外。因此整套代码中变化的部分仅仅是 query 条件构造的方式,其它不动。

全文检索的查询条件构建 API 如下:

// 单字段查询
QueryBuilders.matchQuery("name", "脱脂牛奶");

// 多字段查询
QueryBuilders.multiMatchQuery("脱脂牛奶", "name", "category");

精确查询的查询条件构建 API 如下:

// 词条查询
QueryBuilders.termQuery("category", "牛奶");

// 范围查询
QueryBuilders.rangeQuery("price").gte(100).lte(150);

布尔查询的查询条件构建 API 如下:

// 创建bool查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

// 添加must条件
boolQuery.must(QueryBuilders.termQuery("brand", "华为"));

// 添加filter条件
boolQuery.filter(QueryBuilders.rangeQuery("price").lte(2500));

例如,使用 JavaRestClient 实现搜索功能,条件如下:

  • 搜索关键字为脱脂牛奶。
  • 品牌必须为德亚。
  • 价格必须低于 300。

示例:

@Test
void testSearch() throws IOException {
    // 创建request对象
    SearchRequest request = new SearchRequest("items");

    // 组织DSL参数
    BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

    boolQuery.must(QueryBuilders.matchQuery("name", "脱脂牛奶"))
             .filter(QueryBuilders.termQuery("brand", "德亚"))
             .filter(QueryBuilders.rangeQuery("price").lte(300));

    request.source().query(boolQuery);

    // 发送请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);

    // 解析结果
    parseResponseResult(response);
}

排序和分页

对于 elasticsearch,requeset.source() 就是整个请求 JSON 参数,所以排序、分页都是基于这个来设置:

@Test
void testPageAndSort() throws IOException {
    int pageNo = 1, pageSize = 5;

    // 创建Request
    SearchRequest request = new SearchRequest("items");
    
    // 搜索条件参数
    request.source().query(QueryBuilders.matchQuery("name", "脱脂牛奶"));
    
    // 排序参数
    request.source().sort("price", SortOrder.ASC);
    // 分页参数
    request.source().from((pageNo - 1) * pageSize).size(pageSize);
    
    // 发送请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    
    // 解析结果
    parseResponseResult(response);
}

高亮显示

高亮查询与前面的查询有两点不同:

  • 条件同样是在 request.source() 中指定,只不过高亮条件要基于 HighlightBuilder 来构造。

    @Test
    void testHighLight() throws IOException {
        // 创建request对象
        SearchRequest request = new SearchRequest("items");
    
        // query条件
        request.source().query(QueryBuilders.matchQuery("name", "拉杆箱"));
        // 高亮条件
        request.source().highlighter(
                SearchSourceBuilder.highlight().field("name")
        );
    
        // 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    
        // 解析结果
        handleHighLightResult(response);
    }
  • 高亮响应结果与搜索的文档结果不在一起,需要单独解析。

    private void handleHighLightResult(SearchResponse response) {
        SearchHits searchHits = response.getHits();
    
        // 获取总条数
        long total = searchHits.getTotalHits().value;
        System.out.println("共搜索到" + total + "条数据");
    
        // 遍历结果数组
        SearchHit[] hits = searchHits.getHits();
        for (SearchHit hit : hits) {
            // 得到_source,也就是原始json文档
            String source = hit.getSourceAsString();
            // 反序列化
            ItemDoc item = JSONUtil.toBean(source, ItemDoc.class);
            // 获取高亮结果
            Map<String, HighlightField> hfs = hit.getHighlightFields();
            if (CollUtils.isNotEmpty(hfs)) {
                // 有高亮结果,获取name的高亮结果
                HighlightField hf = hfs.get("name");
                if (hf != null) {
                    // 获取第一个高亮结果片段,就是商品名称的高亮值
                    String hfName = hf.getFragments()[0].string();
                    item.setName(hfName);
                }
            }
            System.out.println(item);
        }
    }

数据聚合

聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。

聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组 :
  • TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组。
  • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组。
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等:
  • Avg:求平均值。
  • Max:求最大值。
  • Min:求最小值。
  • Stats:同时求 maxminavgsum 等。
  • 管道(Pipeline)聚合:其它聚合的结果为基础做进一步运算 。

注意:参加聚合的字段必须是 keyword、日期、数值、布尔类型。

DSL 聚合

例如我们要统计所有商品中共有哪些商品分类,其实就是以分类(category)字段对数据分组。category 值一样的放在同一组,属于 Bucket 聚合中的 Term 聚合。

GET /items/_search
{
  "size": 0, 	// 设置size为0,结果中不包含文档,只包含聚合结果
  "aggs": {		// 定义聚合
    "category_agg": {	// 给聚合起个名字
      "terms": {	    // 聚合的类型,按照品牌值聚合,所以选择term
        "field": "category",	// 参与聚合的字段
        "size": 20			    // 希望获取的聚合结果数量
      }
    }
  }
}

默认情况下,Bucket 聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加 query 条件即可。例如,要查出价格高于 3000 元的手机品牌有哪些,我们需要先查询过滤,再进行聚合:

GET /items/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "category": "手机"
          }
        },
        {
          "range": {
            "price": {
              "gte": 300000
            }
          }
        }
      ]
    }
  }, 
  "size": 0, 
  "aggs": {
    "brand_agg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

除了对数据分组(Bucket)以外,我们还可以对每个 Bucket 内的数据进一步做数据计算和统计。例如:要查出手机有哪些品牌,每个品牌的价格最小值、最大值、平均值:

GET /items/_search
{
  "query": {
    "term": {
      "category": "手机"
    }
  }, 
  "size": 0, 
  "aggs": {
    "brand_agg": {
      "terms": {
        "field": "brand",
        "size": 20
      },
      "aggs": {
        "stats_meric": {
          "stats": {
            "field": "price"
          }
        }
      }
    }
  }
}

JavaRestClient 聚合

聚合条件的要利用 AggregationBuilders 这个工具类来构造。DSL 与 Java API 的语法对比如下:

/*
        GET /indexName/_search
        {
          "size": 0,
          "aggs": {
            "brand_agg": {
              "terms": {
                "field": "brand",
                "size": 20
              }
            }
          }
        }
*/
request.source().size(0);
request.source().aggregation(
	AggregationBuilders.terms("brand_agg").field("brand").size(20)
);

聚合结果与搜索文档同一级别,因此需要单独获取和解析。具体解析语法如下:

// 解析聚合结果
Aggregations aggregations = response.getAggregations();

// 根据名称获取聚合结果
Terms brandTerms = aggregations.get("brand_agg");

// 获取桶
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();

// 遍历
for (Terms.Bucket bucket : buckets) {
    // 获取key,也就是品牌信息
    String brandName = bucket.getKeyAsString();
    //...
}

完整代码如下:

@Test
void testAgg() throws IOException {
    // 创建Request
    SearchRequest request = new SearchRequest("items");
    
    // 准备请求参数
    BoolQueryBuilder bool = QueryBuilders.boolQuery()
            .filter(QueryBuilders.termQuery("category", "手机"))
            .filter(QueryBuilders.rangeQuery("price").gte(300000));
    request.source().query(bool).size(0);
    
    // 聚合参数
    request.source().aggregation(
            AggregationBuilders.terms("brand_agg").field("brand").size(5)
    );
    
    // 发送请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    
    // 解析聚合结果
    Aggregations aggregations = response.getAggregations();
    
    // 获取品牌聚合
    Terms brandTerms = aggregations.get("brand_agg");
    
    // 获取聚合中的桶
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    
    // 遍历桶内数据
    for (Terms.Bucket bucket : buckets) {
        // 5.4.获取桶内key
        String brand = bucket.getKeyAsString();
        System.out.print("brand = " + brand);
        long count = bucket.getDocCount();
        System.out.println("; count = " + count);
    }
}

文章作者: 热心市民灰灰
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 热心市民灰灰 !
  目录