Sentinel实现动态配置的集群流控的方法

介绍

为什么要使用集群流控呢?

相对于单机流控而言,我们给每台机器设置单机限流阈值,在理想情况下整个集群的限流阈值为机器数量✖️单机阈值。不过实际情况下流量到每台机器可能会不均匀,会导致总量没有到的情况下某些机器就开始限流。因此仅靠单机维度去限制的话会无法精确地限制总体流量。而集群流控可以精确地控制整个集群的调用总量,结合单机限流兜底,可以更好地发挥流量控制的效果。

基于单机流量不均的问题以及如何设置集群整体的QPS的问题,我们需要创建一种集群限流的模式,这时候我们很自然地就想到,可以找一个 server 来专门统计总的调用量,其它的实例都与这台 server 通信来判断是否可以调用。这就是最基础的集群流控的方式。

原理

集群限流的原理很简单,和单机限流一样,都需要对 qps 等数据进行统计,区别就在于单机版是在每个实例中进行统计,而集群版是有一个专门的实例进行统计。

这个专门的用来统计数据的称为 Sentinel 的 token server,其他的实例作为 Sentinel 的 token client 会向 token server 去请求 token,如果能获取到 token,则说明当前的 qps 还未达到总的阈值,否则就说明已经达到集群的总阈值,当前实例需要被 block,如下图所示:

和单机流控相比,集群流控中共有两种身份:

  • Token Client:集群流控客户端,用于向所属 Token Server 通信请求 token。集群限流服务端会返回给客户端结果,决定是否限流。
  • Token Server:即集群流控服务端,处理来自 Token Client 的请求,根据配置的集群规则判断是否应该发放 token(是否允许通过)。

而单机流控中只有一种身份,每个 sentinel 都是一个 token server。

注意,集群限流中的 token server 是单点的,一旦 token server 挂掉,那么集群限流就会退化成单机限流的模式。

Sentinel 集群流控支持限流规则和热点规则两种规则,并支持两种形式的阈值计算方式:

  • 集群总体模式:即限制整个集群内的某个资源的总体 qps 不超过此阈值。
  • 单机均摊模式:单机均摊模式下配置的阈值等同于单机能够承受的限额,token server 会根据连接数来计算总的阈值(比如独立模式下有 3 个 client 连接到了 token server,然后配的单机均摊阈值为 10,则计算出的集群总量就为 30),按照计算出的总的阈值来进行限制。这种方式根据当前的连接数实时计算总的阈值,对于机器经常进行变更的环境非常适合。

部署方式

token server 有两种部署方式:

一种是独立部署,就是单独启动一个 token server 服务来处理 token client 的请求,如下图所示:

如果独立部署的 token server 服务挂掉的话,那其他的 token client 就会退化成本地流控的模式,也就是单机版的流控,所以这种方式的集群限流需要保证 token server 的高可用性。

一种是嵌入部署,即作为内置的 token server 与服务在同一进程中启动。在此模式下,集群中各个实例都是对等的,token server 和 client 可以随时进行转变,如下图所示:

嵌入式部署的模式中,如果 token server 服务挂掉的话,我们可以将另外一个 token client 升级为token server来,当然啦如果我们不想使用当前的 token server 的话,也可以选择另外一个 token client 来承担这个责任,并且将当前 token server 切换为 token client。Sentinel 为我们提供了一个 api 来进行 token server 与 token client 的切换:

http://<ip>:<port>/setClusterMode?mode=<xxx>

其中 mode 为 0 代表 client,1 代表 server,-1 代表关闭。

PS:注意应用端需要引入集群限流客户端或服务端的相应依赖。

集群限流控制台

sentinel为用户提供集群限流控制台功能,能够通过控制台配置集群的限流规则以及配置集群的Server与Client。

集群限流客户端

要想使用集群限流功能,必须引入集群限流 client 相关依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-client-default</artifactId>
    <version>1.8.0</version>
</dependency>

集群限流服务端

要想使用集群限流服务端,必须引入集群限流 server 相关依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-server-default</artifactId>
    <version>1.8.0</version>
</dependency>

我们结合server和client实现一个嵌入式模式。在pom中同时引入上面的两个依赖,并配置sentinel控制台地址,实现一个查询订单的接口。

pom

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-server-default</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-cluster-client-default</artifactId>
</dependency>

application.yml

server:
  port: 9091

spring:
  application:
    name: cloudalibaba-sentinel-clusterServer
  cloud:
    sentinel:
      transport:
        #配置sentinel dashboard地址
        dashboard: localhost:8080
        port: 8719 #默认8719端口

OrderController

@RestController
public class OrderController {
    /**
     * 查询订单
     * @return
     */
    @GetMapping("/order/{id}")
    public CommonResult<Order> getOrder(@PathVariable("id") Long id){

        Order order = new Order(id, "212121");
        return CommonResult.success(order.toString());
    }
}

代码示例如cloudalibaba-sentinel-cluster-embedded9091

修改VM options配置,启动三个不同端口的实例,即可。

-Dserver.port=9091 -Dproject.name=cloudalibaba-sentinel-clusterServer -Dcsp.sentinel.log.use.pid=true
-Dserver.port=9092 -Dproject.name=cloudalibaba-sentinel-clusterServer -Dcsp.sentinel.log.use.pid=true
-Dserver.port=9093 -Dproject.name=cloudalibaba-sentinel-clusterServer -Dcsp.sentinel.log.use.pid=true 

控制台配置

登录sentinel的控制台,并有访问量后,我们就可以在 Sentinel上面看到集群流控,如下图所示:

点击添加Token Server。

从实例列表中选择一个作为Server端,其他作为Client端,并选中到右侧Client列表,配置token sever端的最大允许的QPS,用于对 Token Server 的资源使用进行限制,防止在嵌入模式下影响应用本身。

配置完成之后的Token Server列表,如下图所示

使用控制台配置token Server、token Client以及限流规则,有很多的缺点:

1、限流规则,不能持久化,应用重启之后,规则丢失。

2、token Server 、token Client配置也会丢失。

官方推荐给集群限流服务端注册动态配置源来动态地进行配置。我们使用nacos作为配置中心,动态配置客户端与服务端属性以及限流规则,实现动态集群限流。

sentinel结合nacos实现集群限流

我们使用Nacos对cloudalibaba-sentinel-cluster-embedded9091进行改造,实现动态配置源来动态进行配置。

配置源注册的相关逻辑可以置于 InitFunc 实现类中,并通过 SPI 注册,在 Sentinel 初始化时即可自动进行配置源加载监听。

嵌入模式部署

添加ClusterInitFunc类

public class ClusterInitFunc implements InitFunc {

    //应用名称
    private static final String APP_NAME = AppNameUtil.getAppName();

    //nacos集群地址
    private final String remoteAddress = "localhost:8848";

    //nacos配置的分组名称
    private final String groupId = "SENTINEL_GROUP";

    //配置的dataId
    private final String flowDataId = APP_NAME + Constants.FLOW_POSTFIX;
    private final String paramDataId = APP_NAME + Constants.PARAM_FLOW_POSTFIX;
    private final String configDataId = APP_NAME + Constants.CLIENT_CONFIG_POSTFIX;
    private final String clusterMapDataId = APP_NAME + Constants.CLUSTER_MAP_POSTFIX;

    private static final String SEPARATOR = "@";

    @Override
    public void init() {
        // Register client dynamic rule data source.
        //动态数据源的方式配置sentinel的流量控制和热点参数限流的规则。
        initDynamicRuleProperty();

        // Register token client related data source.
        // Token client common config
        // 集群限流客户端的配置属性
        initClientConfigProperty();
        // Token client assign config (e.g. target token server) retrieved from assign map:
        //初始化Token客户端
        initClientServerAssignProperty();

        // Register token server related data source.
        // Register dynamic rule data source supplier for token server:
        //集群的流控规则,比如限制整个集群的流控阀值,启动的时候需要添加-Dproject.name=项目名
        registerClusterRuleSupplier();
        // Token server transport config extracted from assign map:
        //初始化server的端口配置
        initServerTransportConfigProperty();

        // Init cluster state property for extracting mode from cluster map data source.
        //初始化集群中服务是客户端还是服务端
        initStateProperty();
    }

    private void initDynamicRuleProperty() {

        //流量控制的DataId分别是APP_NAME + Constants.FLOW_POSTFIX;热点参数限流规则的DataId是APP_NAME + Constants.PARAM_FLOW_POSTFIX;

        ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
            flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        FlowRuleManager.register2Property(ruleSource.getProperty());

        ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
            paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
    }

    private void initClientConfigProperty() {
        ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
            configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
        ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
    }

    private void initServerTransportConfigProperty() {
        ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = new Gson().fromJson(source, new TypeToken<List<ClusterGroupEntity>>(){}.getType());
            return Optional.ofNullable(groupList)
                .flatMap(this::extractServerTransportConfig)
                .orElse(null);
        });
        ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
    }

    private void registerClusterRuleSupplier() {
        // Register cluster flow rule property supplier which creates data source by namespace.
        // Flow rule dataId format: ${namespace}-flow-rules
        ClusterFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                namespace + Constants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
            return ds.getProperty();
        });
        // Register cluster parameter flow rule property supplier which creates data source by namespace.
        ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                namespace + Constants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
            return ds.getProperty();
        });
    }

    private void initClientServerAssignProperty() {
        // Cluster map format:
        // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","serverId":"112.12.88.68@8728","port":11111}]
        // serverId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
        ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = new Gson().fromJson(source, new TypeToken<List<ClusterGroupEntity>>(){}.getType());
            return Optional.ofNullable(groupList)
                .flatMap(this::extractClientAssignment)
                .orElse(null);
        });
        ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
    }

    private void initStateProperty() {
        // Cluster map format:
        // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","serverId":"112.12.88.68@8728","port":11111}]
        // serverId: <ip@commandPort>, commandPort for port exposed to Sentinel dashboard (transport module)
        ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(remoteAddress, groupId,
            clusterMapDataId, source -> {
            List<ClusterGroupEntity> groupList = new Gson().fromJson(source, new TypeToken<List<ClusterGroupEntity>>(){}.getType());
            return Optional.ofNullable(groupList)
                .map(this::extractMode)
                .orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
        });
        ClusterStateManager.registerProperty(clusterModeDs.getProperty());
    }

    private int extractMode(List<ClusterGroupEntity> groupList) {
        // If any server group serverId matches current, then it's token server.
        if (groupList.stream().anyMatch(this::machineEqual)) {
            return ClusterStateManager.CLUSTER_SERVER;
        }
        // If current machine belongs to any of the token server group, then it's token client.
        // Otherwise it's unassigned, should be set to NOT_STARTED.
        boolean canBeClient = groupList.stream()
            .flatMap(e -> e.getClientSet().stream())
            .filter(Objects::nonNull)
            .anyMatch(e -> e.equals(getCurrentMachineId()));
        return canBeClient ? ClusterStateManager.CLUSTER_CLIENT : ClusterStateManager.CLUSTER_NOT_STARTED;
    }

    private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
        return groupList.stream()
            .filter(this::machineEqual)
            .findAny()
            .map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
    }

    private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupEntity> groupList) {
        if (groupList.stream().anyMatch(this::machineEqual)) {
            return Optional.empty();
        }
        // Build client assign config from the client set of target server group.
        for (ClusterGroupEntity group : groupList) {
            if (group.getClientSet().contains(getCurrentMachineId())) {
                String ip = group.getIp();
                Integer port = group.getPort();
                return Optional.of(new ClusterClientAssignConfig(ip, port));
            }
        }
        return Optional.empty();
    }

    private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
        return getCurrentMachineId().equals(group.getServerId());
    }

    private String getCurrentMachineId() {
        // Note: this may not work well for container-based env.
        return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getRuntimePort();
    }
}

在resources文件夹下创建META-INF/service,,然后创建一个叫做com.alibaba.csp.sentinel.init.InitFunc的文件,在文件中指名实现InitFunc接口的类全路径,内容如下:

com.liang.springcloud.alibaba.init.ClusterInitFunc

添加配置的解析类:

public class ClusterGroupEntity implements Serializable {

    private String serverId;
    private String ip;
    private Integer port;
    private Set<String> clientSet;

    public String getServerId() {
        return serverId;
    }

    public void setServerId(String serverId) {
        this.serverId = serverId;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public Set<String> getClientSet() {
        return clientSet;
    }

    public void setClientSet(Set<String> clientSet) {
        this.clientSet = clientSet;
    }

    @Override
    public String toString() {
        return "ClusterGroupEntity{" +
                "serverId='" + serverId + '\'' +
                ", ip='" + ip + '\'' +
                ", port=" + port +
                ", clientSet=" + clientSet +
                '}';
    }
}

在Nacos中添加动态规则配置,以及token server与token client的配置:

DataId:cloudalibaba-sentinel-clusterServer-flow-rules Group:SENTINEL_GROUP 配置内容(json格式):

[
    {
        "resource" : "/order/{id}",     // 限流的资源名称
        "grade" : 1,                         // 限流模式为:qps,线程数限流0,qps限流1
        "count" : 20,                        // 阈值为:20
        "clusterMode" :  true,               // 是否是集群模式,集群模式为:true
        "clusterConfig" : {
            "flowId" : 111,                  // 全局唯一id
            "thresholdType" : 1,             // 阈值模式为:全局阈值,0是单机均摊,1是全局阀值
            "fallbackToLocalWhenFail" : true // 在 client 连接失败或通信失败时,是否退化到本地的限流模式
        }
    }
]

DataId:cloudalibaba-sentinel-clusterServer-cluster-client-config Group:SENTINEL_GROUP 配置内容(json格式):

{
    "requestTimeout": 20
}

DataId:cloudalibaba-sentinel-clusterServer-cluster-map Group:SENTINEL_GROUP 配置内容(json格式):

[{
	"clientSet": ["10.133.40.30@8721", "10.133.40.30@8722"],
	"ip": "10.133.40.30",
	"serverId": "10.133.40.30@8720",
	"port": 18730   //这个端口是token server通信的端口
}]

重新启动服务,并访问接口,我们可以看到流控规则与集群流控都自动配置完成。我们需要测试,我们集群流控是否已经生效。

不断执行以下命令:

ab -n 100 -c 50 http://localhost:9091/order/1
ab -n 100 -c 50 http://localhost:9092/order/3
ab -n 100 -c 50 http://localhost:9093/order/1

测试效果图:

我们从实时监控图上可以看出,资源名为/order/{id},整个集群的QPS为20,跟我们的配置是一样的。当作为token server的机器挂掉后,集群限流会退化到 local 模式的限流,即在本地按照单机阈值执行限流检查。

Token Server 分配配置:

上面这张图可以很好帮忙我们解释嵌入模式的具体实现。通过配置信息解析,管理我们的token server与token client。

适用范围:

嵌入模式适合某个应用集群内部的流控。由于隔离性不佳,token server会影响应用本身,需要限制 token server 的总QPS。

独立模式部署

独立模式相对于嵌入模式而言就是将token server与应用隔离,进行独立部署。将嵌入模式中token server和token client分离,分别进行配置。我们只需要将 InitFunc 实现类进行拆分。

token server的nacos配置

server的名称空间配置,(集群的namespace或客户端项目名)如下:

DataId:cluster-server-namespace-set Group:SENTINEL_ALONE_GROUP 配置内容(json格式):

[
    "cloudalibaba-sentinel-cluster-client-alone"
]

server的通信端口配置,如下:

DataId:cluster-server-transport-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):

{
 "idleSecods":600,
 "port": 18730
}

Token sever的流控限制配置,如下:

DataId:cluster-server-flow-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):

{
    "exceedCount":1.0,
    "maxAllowedQps":20000,
    "namespace":"cloudalibaba-sentinel-cluster-client-alone"
}

token server的host地址与端口号配置,如下:

DataId: cluster-server-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):

{
    "serverHost": "10.133.40.30",
    "serverPort": 18730
}

token server的InitFunc类:

/**
 * @PROJECT_NAME: SpringCloud-Learning
 * @USER: yuliang
 * @DESCRIPTION:
 * @DATE: 2021-04-01 10:01
 */
public class ClusterServerInitFunc implements InitFunc {

    //nacos集群地址
    private final String remoteAddress = "localhost:8848";
    //配置的分组名称
    private final String groupId = "SENTINEL_ALONE_GROUP";

    //配置的dataId
    private final String namespaceSetDataId = "cluster-server-namespace-set";
    private final String serverTransportDataId = "cluster-server-transport-config";
    private final String serverFlowDataId = "cluster-server-flow-config";

    @Override
    public void init() {

        //监听特定namespace(集群的namespace或客户端项目名)下的集群限流规则
        initPropertySupplier();
        // 设置tokenServer管辖的作用域(即管理哪些应用)
        initTokenServerNameSpaces();

        // Server transport configuration data source.
        //Server端配置
        initServerTransportConfig();

        // 初始化最大qps
        initServerFlowConfig();

        //初始化服务器状态
        initStateProperty();

    }

    private  void initPropertySupplier(){

        // Register cluster flow rule property supplier which creates data source by namespace.
        // Flow rule dataId format: ${namespace}-flow-rules
        ClusterFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                    namespace + Constants.FLOW_POSTFIX,
                    source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
            return ds.getProperty();
        });
        // Register cluster parameter flow rule property supplier which creates data source by namespace.
        ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<ParamFlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId,
                    namespace + Constants.PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
            return ds.getProperty();
        });

    }

    private void initTokenServerNameSpaces(){
        // Server namespace set (scope) data source.
        ReadableDataSource<String, Set<String>> namespaceDs = new NacosDataSource<>(remoteAddress, groupId,
                namespaceSetDataId, source -> JSON.parseObject(source, new TypeReference<Set<String>>() {}));
        ClusterServerConfigManager.registerNamespaceSetProperty(namespaceDs.getProperty());
    }

    private void initServerTransportConfig(){
        // Server transport configuration data source.
        ReadableDataSource<String, ServerTransportConfig> transportConfigDs = new NacosDataSource<>(remoteAddress,
                groupId, serverTransportDataId,
                source -> JSON.parseObject(source, new TypeReference<ServerTransportConfig>() {}));
        ClusterServerConfigManager.registerServerTransportProperty(transportConfigDs.getProperty());
    }

    private void initServerFlowConfig(){

        // Server namespace set (scope) data source.
        ReadableDataSource<String, ServerFlowConfig> serverFlowConfig = new NacosDataSource<>(remoteAddress, groupId,
                serverFlowDataId, source -> JSON.parseObject(source, new TypeReference<ServerFlowConfig>() {}));

        ClusterServerConfigManager.registerGlobalServerFlowProperty(serverFlowConfig.getProperty());
    }

    private void initStateProperty() {
        ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER);

    }
}

token client的nacos配置

客户端请求超时配置,如下:

DataId:cluster-client-config Group:SENTINEL_ALONE_GROUP 配置内容(json格式):

{
    "requestTimeout": 20
}

流控限流配置,如下:

DataId: cloudalibaba-sentinel-cluster-client-alone-flow-rules Group:SENTINEL_ALONE_GROUP 配置内容(json格式):

[
    {
        "resource" : "/order/{id}",     // 限流的资源名称
        "grade" : 1,                         // 限流模式为:qps
        "count" : 30,                        // 阈值为:30
        "clusterMode" :  true,               // 集群模式为:true
        "clusterConfig" : {
            "flowId" : 111,                  // 全局唯一id
            "thresholdType" : 1,             // 阈值模式为:全局阈值
            "fallbackToLocalWhenFail" : true // 在 client 连接失败或通信失败时,是否退化到本地的限流模式
        }
    }
]

热点限流配置,如下:

DataId:cloudalibaba-sentinel-cluster-client-alone-param-rules Group:SENTINEL_ALONE_GROUP 配置内容(json格式):

[
    {
        "resource" : "order",          // 限流的资源名称
        "paramIdx" : 1,                      //参数索引
        "grade" : 1,                         // 限流模式为:qps
        "count" : 10,                        // 阈值为:10
        "clusterMode" :  true,               // 集群模式为:true
        "clusterConfig" : {
            "flowId" : 121,                  // 全局唯一id
            "thresholdType" : 1,             // 阈值模式为:全局阈值
            "fallbackToLocalWhenFail" : true // 在 client 连接失败或通信失败时,是否退化到本地的限流模式
        },
        "paramFlowItemList":[      //索引为1的参数值为hot时,接口阈值为50,其他值均为10
            {
                object: "hot",
                count: 50,
                classType: "java.lang.String"
            }
        ]
    }
]

Token client的InitFunc类:

/**
 * @PROJECT_NAME: SpringCloud-Learning
 * @USER: yuliang
 * @DESCRIPTION:
 * @DATE: 2021-04-01 17:47
 */
public class ClusterClientInitFunc implements InitFunc {

    //项目名称
    private static final String APP_NAME = AppNameUtil.getAppName();
    //nacos集群地址
    private final String remoteAddress = "localhost:8848";
    //nacos配置的分组名称
    private final String groupId = "SENTINEL_ALONE_GROUP";

    //项目名称 + Constants的配置名称,组成配置的dataID
    private final String flowDataId = APP_NAME + Constants.FLOW_POSTFIX;
    private final String paramDataId = APP_NAME + Constants.PARAM_FLOW_POSTFIX;
    private final String configDataId = "cluster-client-config";
    private final String serverDataId =  "cluster-server-config";

    @Override
    public void init() throws Exception {

        // Register client dynamic rule data source.
        //客户端,动态数据源的方式配置sentinel的流量控制和热点参数限流的规则。
        initDynamicRuleProperty();

        // Register token client related data source.
        // Token client common config
        // 集群限流客户端的配置属性
        initClientConfigProperty();
        // Token client assign config (e.g. target token server) retrieved from assign map:
        //初始化Token客户端
        initClientServerAssignProperty();

        //初始化客户端状态
        initStateProperty();
    }

    private void initDynamicRuleProperty() {

        //流量控制的DataId分别是APP_NAME + Constants.FLOW_POSTFIX;热点参数限流规则的DataId是APP_NAME + Constants.PARAM_FLOW_POSTFIX;

        ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId,
                flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
        FlowRuleManager.register2Property(ruleSource.getProperty());

        ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(remoteAddress, groupId,
                paramDataId, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {}));
        ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
    }

    private void initClientConfigProperty() {
        ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
                configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
        ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
    }

    private void initClientServerAssignProperty() {
        ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
                serverDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientAssignConfig>() {}));
        ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
    }

    private void initStateProperty() {
        ClusterStateManager.applyState(ClusterStateManager.CLUSTER_CLIENT);

    }
}

核心的代码与配置,如上所示,其他代码,可以访问:

<module>cloudalibaba-sentinel-cluster-server-alone9092</module>
<module>cloudalibaba-sentinel-cluster-client-alone9093</module>

测试:

启动cloudalibaba-sentinel-cluster-server-alone9092,我们启动两个实例,模拟集群(可以启动多个):

-Dserver.port=9092 -Dcsp.sentinel.log.use.pid=true
-Dserver.port=9094 -Dcsp.sentinel.log.use.pid=true

启动cloudalibaba-sentinel-cluster-client-alone9093,我们启动1个实例,模拟server(实现master选举之后,可以启动多个):

-Dserver.port=9093 -Dcsp.sentinel.log.use.pid=true

不断执行以下命令,进行接口访问测试:

ab -n 100 -c 50 http://localhost:9092/order/1
ab -n 100 -c 50 http://localhost:9094/order/3

我们从实时监控图上可以看出,资源名为/order/{id},整个集群的QPS为30,跟我们的配置是一样的。当作为token server的机器挂掉后,集群限流会退化到 local 模式的限流,即在本地按照单机阈值执行限流检查。

热点限流已经为大家实现了,大家可以自行测试,比较简单,不再累述。

ab -n 100 -c 50  http://localhost:9092/hot_order/1/hot
ab -n 100 -c 50  http://localhost:9094/hot_order/1/hot

ab -n 100 -c 50  http://localhost:9092/hot_order/1/nothot
ab -n 100 -c 50  http://localhost:9094/hot_order/1/nothot

其它

若在生产环境使用集群限流,管控端还需要关注以下的问题:

  • Token Server 自动管理、调度(分配/选举 Token Server)
  • Token Server 高可用,在某个 server 不可用时自动 failover 到其它机器

总结

集群流控,有两种模式,嵌入模式和独立模式,个人不建议在业务系统使用集群流控,集群流控可以在网关层做,业务层的话可以使用单机流控,相对来说简单好上手。token server目前存在单点问题,需要个人实现master选举,并修改 cluster-server-config的IP即可。

代码示例

本文示例读者可以通过查看下面仓库中的项目,如下所示:

<module>cloudalibaba-sentinel-cluster</module>

Github:https://github.com/jiuqiyuliang/SpringCloud-Learning

到此这篇关于Sentinel实现动态配置的集群流控的文章就介绍到这了,更多相关Sentinel集群流控内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • spring cloud gateway整合sentinel实现网关限流

    这篇文章主要介绍了spring cloud gateway整合sentinel实现网关限流,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 说明: sentinel可以作为各微服务的限流,也可以作为gateway网关的限流组件. spring cloud gateway有限流功能,但此处用sentinel来作为替待. 说明:sentinel流控可以放在gateway网关端,也可以放在各微服务端. 1,以父工程为基础,创建子工程 2,添加pom依赖

  • 玩转Redis搭建集群之Sentinel详解

    前言 Redis作为内存数据库,需要具备高可用的特点,不然如果服务器宕机,还在内存里的数据就会丢失.我们最常用的高可用方法就是搭建集群,master机器挂了,可以让slave机器顶上,继续提供服务.但是Redis集群是不会自动进行主从切换的,也就是说,如果主节点非常不争气的在凌晨3点挂了,那么运维同学就要马上起床,把从节点改成主节点,这样的操作是非常繁琐低效的.为此,Redis官方提供了一种解决方案:Redis Sentinel 简介 Redis Sentinel集群通常由3到5个节点组成,如果

  • 基于docker搭建redis-sentinel集群的方法示例

    1.概述 Redis 集群可以在一组 redis 节点之间实现高可用性和 sharding.在集群中会有 1 个 master 和多个 slave 节点.当 master 节点失效时,应选举出一个 slave 节点作为新的 master.然而 Redis 本身(包括它的很多客户端)没有实现自动故障发现并进行主备切换的能力,需要外部的监控方案来实现自动故障恢复. Redis Sentinel 是官方推荐的高可用性解决方案.它是 Redis 集群的监控管理工具,可以提供节点监控.通知.自动故障恢复和

  • Sentinel实现动态配置的集群流控的方法

    介绍 为什么要使用集群流控呢? 相对于单机流控而言,我们给每台机器设置单机限流阈值,在理想情况下整个集群的限流阈值为机器数量✖️单机阈值.不过实际情况下流量到每台机器可能会不均匀,会导致总量没有到的情况下某些机器就开始限流.因此仅靠单机维度去限制的话会无法精确地限制总体流量.而集群流控可以精确地控制整个集群的调用总量,结合单机限流兜底,可以更好地发挥流量控制的效果. 基于单机流量不均的问题以及如何设置集群整体的QPS的问题,我们需要创建一种集群限流的模式,这时候我们很自然地就想到,可以找一个 s

  • Linux中安装配置hadoop集群详细步骤

    一. 简介 参考了网上许多教程,最终把hadoop在ubuntu14.04中安装配置成功.下面就把详细的安装步骤叙述一下.我所使用的环境:两台ubuntu 14.04 64位的台式机,hadoop选择2.7.1版本.(前边主要介绍单机版的配置,集群版是在单机版的基础上,主要是配置文件有所不同,后边会有详细说明) 二. 准备工作 2.1 创建用户 创建用户,并为其添加root权限,经过亲自验证下面这种方法比较好. sudo adduser hadoop sudo vim /etc/sudoers

  • 在CentOS中安装Rancher2并配置kubernetes集群的图文教程

    准备 一台CentOS主机,安装DockerCE,用于安装Rancher2 一台CentOS主机,安装DockerCE,用于安装kubernetes集群管理主机 多台CentOS主机,安装DockerCE,用于运行kubernetes工作节点,工作节点需要与集群管理主机在同一个子网中 掌握Docker常用操作,了解K8s基本原理 安装Rancher2 第一步:执行命令,运行Rancher2,绑定主机端口80和443. docker run -d --restart=unless-stopped

  • Nacos配置中心集群原理及源码分析

    目录 Nacos集群工作原理 配置变更同步入口 AsyncNotifyService AsyncTask 目标节点接收请求 NacosDelayTaskExecuteEngine ProcessRunnable processTasks DumpProcessor.process Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢? 下面这个图,表示Nacos集群的部署图. Nacos集群工作原理 Nacos作为配置中心的集群结构中,是一种无中心化节点的设计

  • 关于Java中配置ElasticSearch集群环境账号密码的问题

    1.修改主站点的elasticsearch.yml添加一下行: xpack.security.enabled: true 2.生成安全秘钥 切到ES安装目录,执行bin/elasticsearch-certutil ca -out config/elastic-certificates.p12 -pass “” 会在/home/elasticsearch-7.9.3/config目录生成elastic-certificates.p12 3.继续修改ES yml文件 添加以下四行: xpack.s

  • PHP实现分布式memcache设置web集群session同步的方法

    本文实例讲述了PHP实现分布式memcache设置web集群session同步的方法. php的session默认是文件存储: session.save_handler = files session.save_path = "/var/lib/php/session" 当做web集群,需要session同步时,将session存到分布式memcache来达到共享同步是个不错的办法 方法: 第1种: vi /etc/php.ini session.save_handler = memc

  • docker安装Elasticsearch7.6集群并设置密码的方法步骤

    目录 一些基础配置 关于版本和docker镜像 开始 关于elasticsearch.yml 关于证书elastic-certificates.p12 生成密码 使用密码 忘记密码 Elasticsearch从6.8开始, 允许免费用户使用X-Pack的安全功能, 以前安装es都是裸奔.接下来记录配置安全认证的方法. 为了简化物理安装过程,我们将使用docker安装我们的服务. 一些基础配置 es需要修改linux的一些参数. 设置vm.max_map_count=262144 sudo vim

  • 在centos 7中安装配置k8s集群的步骤详解

    配置背景介绍 kubernetes是google开源的容器集群管理系统,提供应用部署.维护.扩展机制等功能,利用kubernetes能方便管理跨集群运行容器化的应用,简称:k8s(k与s之间有8个字母) 为什么要用kubernetes这么复杂的docker集群管理工具呢?一开始接触了docker内置的swarm,这个工具非常简单快捷的完成docker集群功能.但是在使用docker1.13内置的swarm做集群的时候遇到vip负载均衡没有正确映射端口到外网,或者出现地址被占用的情况,这对高可用性

  • Windows Server 2003 下配置 MySQL 集群(Cluster)教程

    MySQL 群集是 MySQL 适合于分布式计算环境的高可用.高冗余版本.它采用了 NDB Cluster 存储引擎,允许在 1 个群集中运行多个 MySQL 服务器.在 MySQL 5.0 及以上的二进制版本中,以及与最新的 Linux 版本兼容的 RPM 包中提供了该存储引擎. MySQL 群集是一种技术,该技术允许在无共享的系统中部署"内存中"和"磁盘中"数据库的 Cluster .通过无共享体系结构,系统能够使用廉价的硬件,而且对软硬件无特殊要求.此外,由于

  • Springcloud Eureka配置及集群代码实例

    springcloud微服务包含的技术种类众多,eureka作为其注册中心,一直处于主流,但在今年已经处于永久停更状态,但其优秀的能力还是值得学习. 整体价格采用聚合工程,后续也存在于聚合工程内. 1.首先配置pom工程的依赖 <dependencies> <!-- eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId&

随机推荐