源码阅读-Guava RateLimiter

Posted by keys961 on January 3, 2020

1. 限流算法

Guava的RateLimiter是用于限流的,其基于一个限流的算法。

而限流算法有很多,不过主要分为2大类:

  • 漏桶算法
  • 令牌桶算法

1.1. 漏桶算法

其思路很简单,它维护2个变量:

  • rate:桶漏出的频率
  • capacity:桶容量

请求会暂时落入桶中,从而占用桶的容量。而桶以rate频率漏出请求,漏出的请求就可以被执行。当桶溢出的时候,即请求速率过快,请求会被阻塞/拒绝。

由于桶漏出的速率是固定的,当突发流量来的时候,速率会被强制限制到rate,这样效率不高。

1.2. 令牌桶算法

其思路也很简单,但思路却和漏桶算法相反。

它也维护2个变量:

  • rate:往桶加入token的频率(若QPS为$Q$,则每过$\frac{1}{Q}$秒往桶中放1个token),也即限制的流量频率
  • capacity:桶中token的最大数量

请求到来后,会尝试从桶中获取token,获取到才能继续执行下去。若token获取不到,请求会被阻塞/拒绝。

令牌桶算法的好处有:

  • 突发流量到来后,只会当桶中token耗尽后,才会限制流量到rate,相对高效
  • 限流设定可以随时更改,只需更改加入token的频率即可,即rate

而Guava的RateLimiter就是基于该算法。

2. Guava RateLimiter 概述

RateLimiter的API是非常简单的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// RateLimiter的创建,必须指定流量频率permitsPerSecond
static RateLimiter create(double permitsPerSecond); // (1)
static RateLimiter create(double permitsPerSecond, Duration warmupPeriod); // (2)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit); // (3)
// 获取批准,阻塞
double acquire();
double acquire(int permits);
// 尝试获取批准,非阻塞/超时检测
boolean tryAcquire();
boolean tryAcquire(int permits);
boolean tryAcquire(long timeout, TimeUnit unit);
boolean tryAcquire(int permits, long timeout, TimeUnit unit);
// 限流频率参数的获取与设置
double getRate();
void setRate(double permitsPerSecond);

而查看源码,RateLimiter实际上有2个实现:

  • SmoothBursty:上面第1个create创建的限流器
  • SmoothWarmingUp:上面第2、3个create创建的限流器,带有预热功能

它们都有共同的基类SmoothRateLimiter,而它的基类就是RateLimiter

3. 基类:RateLimiter & SmoothRateLimiter

RateLimiter实际上只有2个字段:

  • stopWatch:一个StopWatch实例,作为计时器。它从0开始计数(实际上它就是读System.nanoTimes()
  • mutexDoNotUseDirectly:一个Object实例,是一个单例,作为monitor用于上锁(mutex()方法会返回它,然后利用synchronized关键字上锁)

SmoothRateLimiter有4个字段:

  • storedPermits:还有多少个permits没被使用

  • maxPermits:桶中最大存放的permits

  • stableIntervalMicros:每隔多少时间产生1个permits,单位微秒

  • nextFreeTicketMicros:下一次可以获取permits的时间

    storedPermits够的时候,直接相减即可;若不够,则需要将该值往后推,表示我预占了多少时间的permits的量。

    当下一个请求来的时候,若没有达到这个时间,线程就会sleep,并也将该值往后推。

有一个疑问:桶中的permits是如何被添加的?实际上,不需要另一个线程,当请求permits时,只要同步一下,根据时间重新计算桶中的permits数量就可以了。

4. 实现:SmoothBursty

SmoothBursty是一个限流器的简单实现,它适用于一些突发流量来临的情况,这是因为它能缓存一定数量的permits以拿来使用和预占(当permits不够时)。

4.1. 创建

SmoothBursty的创建使用的是第2节中的第1个创建方法,代码如下:

1
2
3
4
5
6
7
8
9
public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

注意SmoothBursty构造函数的第2个参数maxBurstSeconds,它代表桶中最多存放多少时间的permits。这里默认为1秒,且不能修改,意思为桶中最多存放1 * permitsPerSecondpermits

SmoothBursty的字段也只有maxBurstSeconds,所以代码略。

4.2. 设置流量速率

4.1.创建SmoothBursty时,设置了初始速率permitsPerSecond,设置的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// In RateLimiter
public final void setRate(double permitsPerSecond) {
    // ...
    synchronized (mutex()) {
        // 上锁设置速率,参数分别是: 速率、当前时间(微秒)
        doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
}

// In SmoothRateLimiter
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 1. 重新计算桶中的permits
    resync(nowMicros);
    // 2. 设置生成1个permits的时间间隔
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    // 3. 重新设置maxPermits,并更新桶中的permits个数(这里是和旧速率按比例缩放)
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

// In SmoothBursty
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = this.maxPermits;
    // 1. 重新设置maxPermits
    maxPermits = maxBurstSeconds * permitsPerSecond;
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        storedPermits = maxPermits;
    } else {
        // 2. 按比例缩放桶中的permits个数,因为设置的速率改变了
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
    }
}

设置速率的操作非常简单,整个操作是上锁的,因此没有同步问题。其步骤如下:

  • 同步/重新计算桶中的permits个数(重新生成permits,即storedPermits
  • 计算并设置生成1个permits的时间间隔(stableIntervalMicros
  • 重新设置速率和桶permits上限(maxPermits),并等比缩放已有的permitsstoredPermits

4.3. 同步/重新计算桶中存放的permits个数

同步桶中存放permits个数,并同步nextFreeTicketMicros,主要是通过resync(long)方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// In SmoothRateLimiter
void resync(long nowMicros) {
    // 若当前时间大于nextFreeTicketMicros
    // 即很久没有从桶中获取permits/tokens,或者需要第一次计算桶中的permits/tokens
    // 就:1. 需要重新计算/生成桶中的permits
    // 2. 将nextFreeTicketMicros推后到现在
    if (nowMicros > nextFreeTicketMicros) {
        // 1. 计算新增的permits个数
        double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
        // 2. 将新增的permits添加到桶中,但不能超过上限
        storedPermits = min(maxPermits, storedPermits + newPermits);
        // 3. 将nextFreeTicketMicros推后到现在
        nextFreeTicketMicros = nowMicros;
    }
}

这里,只有当nowMicros > nextFreeTicketMicros才能重新计算(因为不这样,说明桶中没permits了,自然不用重新计算)。同步的步骤主要是3步:

  • 计算新增的permits个数
  • 将新增的permits添加到桶中,但不能超过上限
  • nextFreeTicketMicros推后到现在

而这里有一个方法coolDownIntervalMicros(),这里先不用管它,在SmoothWarmingUp中会说明:

  • 在这里,该方法直接返回stableIntervalMicros

注意到,创建限流器,第一次只需resync(long)时,stableIntervalMicros为0,得到的newPermitsDouble.POSITIVE_INFINITY,但是没关系,storedPermits依旧是0。当从限流器获取permits时,resync(long)依旧会被调用,此时stableIntervalMicros已经被设置,storedPermits会被重新计算成一个正常值。

4.4. 获取permits/tokens

这里就是RateLimiter的几个acquire方法了,这里挑选下面这个:

1
2
3
4
5
6
7
8
public double acquire(int permits) {
    // 1. 预约permits,若获取不到足够的,则返回要sleep的时间
    long microsToWait = reserve(permits);
    // 2. 等待第1步计算的时间(若获取到足够的,则不会sleep)
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    // 3. 返回获取permits时sleep的时间,单位为秒
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

这里最重要的就是reserve(int)方法,它加锁最后调用了reserveEarliestAvailable(int, long)方法,用于获取permits(即令牌桶的令牌),若获取不到足够的,则返回需要睡眠的时间。具体可看下面的代码解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
        // 上锁,获取permits,返回等待/睡眠的时间
        return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
    // momentAvailable代表本次请求permits可用的时间
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    // 和当前时间相减,得到要睡眠的时间,睡眠后,permits就有了
    return max(momentAvailable - nowMicros, 0);
}

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 1. 更新storedPermits和nextFreeTicketMicros(若很久没有获取令牌时)
    resync(nowMicros);
    // 直接返回旧的nextFreeTicketMicros
    long returnValue = nextFreeTicketMicros;
    // 当前可用的permits数
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // 需要另外获取的permits数
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 2. 计算不够的部分,需要等待多少时间
    long waitMicros =
        // 这里该方法返回0,从桶中获取permits不需要另外等待
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);
	// 3. 把nextFreeTicketMicros往后推(如果permits不够需要等待)
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    // 4. 拿走permits
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}

可以见到,代码还是很简单的。

不过需要注意的一点:假如storePermits比较小,某一个请求requirePermits > storePermits时,由于返回的是nextFreeTicketMicros旧值,从而本请求睡眠时间为0,直接返回了;不过该操作会顺延nextFreeTicketMicros,当下一个请求到来时,根据顺延的nextFreeTicketMicros,请求会睡眠一段时间,从而限制了速度。

所以可以说,在SmoothBursty中:

  • requirePermits过大时,请求是会预占后面的permits,当前请求的速度影响较小
  • 但是由于之前请求的预占,会延长后一个请求获取permits的时间,从而限制流量

因此,它和传统令牌桶算法有一定的区别,但大致相同

5. 实现:SmoothWarmingUp

SmoothBursty不同,SmoothWarmingUp适用于一些需要预热的场景,如数据库等等。

例如某个服务限流1000QPS,我们不能让系统立刻达到1000QPS流量,而是逐步提高流量限制,用于预热,最后到达流量上限最大值。

从4.4.中的reserveEarliestAvailable(int, long)方法中,有一个方法storedPermitsToWaitTime(double, double),它用于计算从桶中获取已有permits所需要的时间:

  • SmoothBursty:返回0,即从桶中取已有permits不需要等待
  • SmoothWarmingUp:可能返回大于0,在预热阶段,从桶中取已有permits需要等待一定的时间

这就是两者最大的不同。

不过首先,要说明一下预热的概念。

5.1. 预热图解

首先要上一个javadoc中的一张图:

warming_up

说明一下上图:

  • 横轴:表示桶中已暂存的permits

    • thresholePermits:暂存permits数量的阈值。当暂存数量小于该值时,说明桶中permits数量少,流量大,即充分预热;当暂存数量大于该值时,说明桶中permits多,流量小,即可能在预热
    • maxPermits:暂存permits数量的最大值
  • 纵轴:表示生成每个permits的时间间隔

    • stableInterval:预热完毕后的生成permits间隔
    • coldInterval:冷启动时的生成permits间隔
    • coldFactor:一个因子,硬编码为3,满足stableInterval * coldFactor = coldInterval
  • warmUpPeriod:即上面的梯形,该面积就是“预热时长”,由此得到下面的公式:
    • thresholdPermits = 0.5 * warmupPeriod / stableInterval
    • maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)(根据梯形面积公式)
  • 原理说明:

    • storedPermits小的时候,流量大,即预热充分,速度也可以变快,所以生成permits间隔小
      • 流量重复大的时候,storedPermits -> 0,其生成permits间隔最小
    • storedPermits大的时候,流量小,即在预热中,速度需要限制,所以生成permits间隔大
      • 冷启动的时候,storedPermits = maxPermits,其生成permits间隔最大

5.2. 创建

创建SmoothWarmingUp主要是调用第2节的(2),(3)这2个方法,最后都会进入下面这个方法:

1
2
3
4
5
6
7
8
9
10
static RateLimiter create(
      double permitsPerSecond,
      long warmupPeriod,
      TimeUnit unit,
      double coldFactor,
      SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

SmoothWarmingUp主要有下面几个字段:

  • warmupPeriodMicros:以微秒计的预热时间
  • slope:5.1.节图上那条直线的斜率
  • coldFactor:值为3,意思见5.1.节
  • thresholdPermits:之后设置速率时,会被计算,意思见5.1.节

创建SmoothWarmingUp实例后,会初始化上面3个变量,而最后1个变量在设置速率的时候被计算。

5.3. 设置流量速率

总体流程,和SmoothBursty一样,唯一不同的就是方法doSetRate(double, double)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = maxPermits;
    // 1. 根据公式计算coldIntervalMicros, thresholdPermits, maxPermits, slopes
    double coldIntervalMicros = stableIntervalMicros * coldFactor;
    thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
    maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
    slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        storedPermits = 0.0;
    } else {
        // 2. 等比更改暂存的permits数量
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
    }
}

这里不同之处就在于利用5.2.节的公式,计算一系列参数,其它部分和SmoothBursty并没有什么不同。

5.4. 同步/重新计算桶中存放的permits个数

SmoothBursty一样,首先需要resync(long),当计算新增的permits时,会调用下面的代码:

1
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();

注意,这个代码块内,nowMicros > nextFreeTicketMicros,说明流量小,在预热

注意到函数coolDownIntervalMicros()

  • SmoothBursty中,它直接返回了stableIntervalMicros(因为预热不预热都一样)

  • SmoothWarmingUp中,它返回值如下代码所示,代表冷却时间间隔:

    1
    2
    3
    
    double coolDownIntervalMicros() {
        return warmupPeriodMicros / maxPermits;
    }
    

可见,在SmoothWarmingUp中,重新计算获取到newPermits会变小,也引证了预热限流的特点。

5.5. 获取permits/tokens

整体流程,和SmoothBursty一样,我们主要关注方法reserveEarliestAvailable(int, long),这里回顾一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 1. 更新storedPermits和nextFreeTicketMicros(若很久没有获取令牌时)
    resync(nowMicros);
    // 直接返回旧的nextFreeTicketMicros
    long returnValue = nextFreeTicketMicros;
    // 当前可用的permits数(需要消费的最大permits数,不超过storedPermits)
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // 需要另外获取的permits数
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 2. 计算不够的部分,需要等待多少时间
    long waitMicros =
        // 这里该方法返回0,从桶中获取permits不需要另外等待
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);
	// 3. 把nextFreeTicketMicros往后推(如果permits不够需要等待)
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    // 4. 拿走permits
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}

而这里,重点关注storedPermitsToWaitTime(double, double)

  • SmoothBursty中,它直接返回0

  • SmoothWarmingUp中,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    
    // storedPermits代表现在桶中剩下的permits
    // permitsToTake代表请求需要拿走的permits,但是不超过storePermits
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
        // 这里求了5.1.节曲线的定积分
        // 下界是storedPermits,上界是permitsToTake
        double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
        long micros = 0;
        // 1. 求右边梯形的积分
        if (availablePermitsAboveThreshold > 0.0) {
            double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
            double length =
                permitsToTime(availablePermitsAboveThreshold)
                    + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
            micros = (long) (permitsAboveThresholdToTake * length / 2.0);
            permitsToTake -= permitsAboveThresholdToTake;
        }
        // 2. 求左边矩形积分
        micros += (long) (stableIntervalMicros * permitsToTake);
        // 3. 求和积分并返回,即从桶中取permits需要占用的时间(这会增加下一个请求的等待时间)
        return micros;
    }
      
    // 给定permits(x轴),计算interval(y轴)
    private double permitsToTime(double permits) {
        return stableIntervalMicros + permits * slope;
    }
    

而其他情况下,SmoothWarmingUpSmoothBursty一样。

6. 总结

Guava的RateLimiter实现主要以令牌桶算法为基础:

  • 它通过预占permits,以影响下一个请求的等待时间(以nextFreeTicketMicros为锚,它会随请求往后推延,表示请求可以获得permits的时间)
  • 桶中permits的添加,不需要另一个线程,只需请求过来时,根据时间戳重新计算即可

Guava有2个RateLimiter的实现:

  • SmoothBursty:适用于一些突发流量来临的限流器,因为其缓存了permits
  • SmoothWarmingUp:适用于一些需要预热的限流器,因为其由于保存permits多少,调整其生成的速度(permits越高,说明越冷,其生成速度越慢)