Google Guava的RateLimiter是一个简易的流控工具,近日阅读《京麦京东开发平台的高性能架构之路》一文,提到京东用RateLimiter+zk来做流控,于是资料查了一把,源码看了一波。RateLimiter使用的是一种叫令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。

RateLimiter rateLimiter = RateLimiter.create(1000);
...
rateLimiter.acquire(1); // sleep, 阻塞等待新的令牌
if(rateLimiter.tryAcquire(1)) {
    ...
} else {
    log.warn("flow limitted");
}

RateLimiter有两个实现类,SmoothBursty和SmoothWarmUp,从命名上可以看出SmoothWarmUp还提供了服务预热的能力,这点适用于需要预热缓存或者使用了分层编译的应用。RateLimiter提供两个工厂方法来创建这两个实现类的示例。UML图如下:

RateLimiter类

RateLimiter有几个主要的public方法:

public static RateLimiter create(double permitsPerSecond)

指定一个qps阈值,创建一个SmoothBursty实例。当qps超过阈值时,SmoothBursty会通过控制令牌的释放频次,保证qps不超过阈值。如果一段时间内没有流量,SmoothBursty还会存储令牌。

public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)

指定一个qps阈值和一段warmup时间,返回一个SmoothWarmUp实例。在warmup时间内,服务的qps以一定的斜率逐步上升直到阈值。源码中给出一段清晰的解释。而如果SmoothWarmUp在热身时间内没有足够饱和的请求, 则会逐渐降低到冷却状态

  /**
   * This implements the following function:
   *
   *          ^ throttling
   *          |
   * 3*stable +                  /
   * interval |                 /.
   *  (cold)  |                / .
   *          |               /  .   <-- "warmup period" is the area of the trapezoid between
   * 2*stable +              /   .       halfPermits and maxPermits
   * interval |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM . }
   * interval |          .   UP  . } <-- this rectangle (from 0 to maxPermits, and
   *          |          . PERIOD. }     height == stableInterval) defines the cooldown period,
   *          |          .       . }     and we want cooldownPeriod == warmupPeriod
   *          |---------------------------------> storedPermits
   *              (halfPermits) (maxPermits)
   */

setRate()和getRate()代理给了RateLimiter的子抽象类SmoothRateLimiter的doSetRate()和doGetRate():

  @Override
  final double doGetRate() {
    return SECONDS.toMicros(1L) / stableIntervalMicros;
  }
  @Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

其中,doSetRate是个模板方法,先调用resync方法更新了下次释放令牌的时间nextFreeTicketMicros和目前保有的令牌数storedPermits。然后调用实现类自己的doSetRate().

public double acquire() {
    return acquire(1);
}

public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

acquire方法先“预定”n个令牌。如果有足够令牌,reserve立即修改storedPermits,否则,reservce告诉acquire方法还有microsToWait的时间要等待。于是acquire调用stopwatch睡上microsToWait长的时间,然后返回。真的是Thread.sleep().

  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }
  
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }
  
  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;

    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

可以看到,如果调用者要求拿到permits个令牌,如果目前保有的令牌数足够,则waitMicros=0,无需等待;否则,计算还要等多久才能等到剩下的requiredPermits - storedPermits个令牌。之羽还要等多久,和SmoothBusty和SmoothWarmingUp自己的策略有关(二者分别实现了storedPermitsToWaitTime方法)。

class SmoothBursty {
    ....
    
    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }
}

class SmoothWarmingUp {
    ....
    
    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      double availablePermitsAboveHalf = storedPermits - halfPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveHalf > 0.0) {
        double permitsAboveHalfToTake = min(availablePermitsAboveHalf, permitsToTake);
        micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
            + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
        permitsToTake -= permitsAboveHalfToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (stableIntervalMicros * permitsToTake);
      return micros;
    }
}    

对于SmoothBursty,直接等待(freshPermits * 1/qps)时间,因为SmoothBursty每1/qps必然释放一个令牌;对于SmoothWarmingUp,则要等更久。