滑动窗口限流器组件-自定义starter-基于Redisson

在高并发的场景中,我们需要对单个时间内的同一类请求进行限制,以防止过多的请求在短时间内对系统造成太大压力。

这个组件中,我们封装了 Sentinel、Redis 以及Redisson等,其中用Redisson实现的限流器,这种方法利用Redis进行分布式限流,很适合高并发和分布式环境。

原理部分可以直接看github上面讲的,看了下觉得讲的还是很不错的,地址:Redisson分布式限流器RRateLimiter原理

<dependencies>

    <dependency>
        <groupId>cn.mifu</groupId>
        <artifactId>nft-turbo-cache</artifactId>
    </dependency>

    <!--    Sentinel    -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
    </dependency>

    <!--    Sentinel集成Nacos持久化配置    -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-sentinel-datasource</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-datasource-nacos</artifactId>
        <version>1.8.7</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

定义了一个通用的滑动窗口限流器:

import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.api.RedissonClient;

/**
 * 滑动窗口限流服务
 *
 * @author mifuRD
 */
public class SlidingWindowRateLimiter implements RateLimiter {

    private RedissonClient redissonClient;

    public SlidingWindowRateLimiter(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    @Override
    public Boolean tryAcquire(String key, int limit, int windowSize) {
        RRateLimiter rRateLimiter = redissonClient.getRateLimiter(key);

        if (!rRateLimiter.isExists()) {
            rRateLimiter.trySetRate(RateType.OVERALL, limit, windowSize, RateIntervalUnit.SECONDS);
        }

        return rRateLimiter.tryAcquire();
    }
}

限流方法逻辑解析

1.获取限流器实例:

RRateLimiter rRateLimiter = redissonClient.getRateLimiter(key);

2.初始化限流器配置:

if (!rRateLimiter.isExists()) {
    rRateLimiter.trySetRate(RateType.OVERALL, limit, windowSize, RateIntervalUnit.SECONDS);
}

判断限流器是否已经存在,如果不存在则进行配置。

使用RateType.OVERALL表示集群限流策略。

设置限流速率,即在windowSize秒内最多允许limit个请求。

3.尝试获取令牌:

return rRateLimiter.tryAcquire();

尝试从限流器中获取令牌,如果成功则返回true,否则返回false。

限流实例配置

为了方便使用,我们自定义了 bean——slidingWindowRateLimiter:

import cn.mifu.nft.turbo.limiter.SlidingWindowRateLimiter;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author mifuRD
 */
@Configuration
public class RateLimiterConfiguration {

    @Bean
    public SlidingWindowRateLimiter slidingWindowRateLimiter(RedissonClient redisson) {
        return new SlidingWindowRateLimiter(redisson);
    }
}

并且新建org.springframework.boot.autoconfigure.AutoConfiguration.imports,内容如下:

cn.mifu.nft.turbo.limiter.configuration.RateLimiterConfiguration

单元测试

/**
 * @author mifuRD
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RateLimiterTestConfiguration.class})
@ActiveProfiles("test")
public class SlidingWindowRateLimiterTest {

    @Autowired
    SlidingWindowRateLimiter slidingWindowRateLimiter;

    @Test
    @Ignore
    public void tryAcquire1() {
        Boolean result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
        Assert.assertTrue(result);
        result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
        Assert.assertTrue(result);
        result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
        Assert.assertTrue(result);
        result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
        Assert.assertFalse(result);

        try {
            Thread.currentThread().sleep(10000);
        }catch (Exception e){

        }
        result = slidingWindowRateLimiter.tryAcquire("testLock997", 3, 10);
        Assert.assertTrue(result);

    }

    @Test
    @Ignore
    public void tryAcquire() {
        Boolean result = slidingWindowRateLimiter.tryAcquire("testLock", 1, 5);
        Assert.assertTrue(result);
        Boolean result1 = slidingWindowRateLimiter.tryAcquire("testLock", 1, 3);
        Assert.assertFalse(result1);
        try {
            Thread.currentThread().sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        Boolean result2 = slidingWindowRateLimiter.tryAcquire("testLock", 1, 1);
        Assert.assertFalse(result2);

        try {
            Thread.currentThread().sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        Boolean result3 = slidingWindowRateLimiter.tryAcquire("testLock", 1, 1);
        Assert.assertTrue(result3);
    }
}

使用示例

比如发送短信验证码接口需要做限流,防盗刷

确保同一个手机号在一分钟之内只能进行一次短信验证码的发送

    /**
     * 生成并发送短信验证码
     * @author mifuRD
     * @param telephone
     * @return
     */
    @Facade
    @Override
    public NoticeResponse generateAndSendSmsCaptcha(String telephone) {
        Boolean access = slidingWindowRateLimiter.tryAcquire(telephone, 1, 60);

        if (!access) {
            throw new SystemException(SEND_NOTICE_DUPLICATED);
        }

        // 生成验证码
        String captcha = RandomUtil.randomNumbers(4);

        // 验证码存入Redis
        redisTemplate.opsForValue().set(CAPTCHA_KEY_PREFIX + telephone, captcha, 5, TimeUnit.MINUTES);

        Notice notice = noticeService.saveCaptcha(telephone, captcha);
        // 使用虚拟线程异步发送短信验证码,并修改短信发送状态
        Thread.ofVirtual().start(() -> {
            SmsSendResponse result = smsService.sendMsg(notice.getTargetAddress(), notice.getNoticeContent());
            if (result.getSuccess()) {
                notice.setState(NoticeState.SUCCESS);
                notice.setSendSuccessTime(new Date());
                noticeService.updateById(notice);
            } else {
                notice.setState(NoticeState.FAILED);
                notice.addExtendInfo("executeResult", JSON.toJSONString(result));
                noticeService.updateById(notice);
            }
        });

        return new NoticeResponse.Builder().setSuccess(true).build();
    }