很多场景下都需要流量控制,从底层的数据包传输到购物秒杀。

常见的流量控制算法有两种,一个是漏桶算法,桶本身具有一个恒定的速率往下流水,而上方一直有水进入桶中。当桶还未满时,上方的水可以加入。一旦水满,上方的水就溢出,可以认为请求被丢弃。

这个算法不难理解,可以保证系统的稳定,因为水流出的速度一直是不变的,即使在有突发流量的情况下。但是如果到了网站秒杀的情境下,有突发流量,这个算法就不好用了。更常用的是令牌通算法。

令牌桶还是可以认为是一个盛满了令牌的箱子,每处理一个请求就会拿走一个令牌,如果令牌没有了,请求就会失败。而根据速率限制,令牌还是一直在添加的,所以还是有一部分的请求是可以慢慢的拿到令牌的。而一开始箱子里面是有一些令牌的,一开始的突发流量都可以成功的拿到令牌。

根据 https://github.com/celery/kombu/blob/master/kombu/utils/limits.py , 我简单改写了一下,可以很直观的看出来流量控制结果。

# coding=utf-8
import time


class TokenBucket(object):
    def __init__(self, fill_rate, capacity):
        self.capacity = float(capacity)
        self._left_tokens = capacity
        self.fill_rate = float(fill_rate)
        self.timestamp = time.time()

    def consume(self, tokens=1):
        """Return :const:`True` if the number of tokens can be consumed
        from the bucket.  If they can be consumed, a call will also consume the
        requested number of tokens from the bucket. Calls will only consume
        `tokens` (the number requested) or zero tokens -- it will never consume
        a partial number of tokens."""
        if tokens <= self.tokens:
            self._left_tokens -= tokens
            return True
        return False

    def expected_time(self, tokens=1):
        """Return the time (in seconds) when a new token is expected
        to be available. This will not consume any tokens from the bucket."""
        _tokens = self.tokens
        tokens = max(tokens, _tokens)
        return (tokens - _tokens) / self.fill_rate

    @property
    def tokens(self):
        if self._left_tokens < self.capacity:
            now = time.time()
            delta = self.fill_rate * (now - self.timestamp)
            self._left_tokens = min(self.capacity, self._left_tokens + delta)
            self.timestamp = now
        return self._left_tokens


true = false = 0

# 每秒往桶里面添加5个令牌,桶的大小是50
bucket = TokenBucket(fill_rate=5, capacity=50)
for i in range(300):
    time.sleep(0.1)
    if bucket.consume():
        true += 1
        print "Accepted", i
    else:
        false += 1
        print "Dropped, time left ", bucket.expected_time()
print true, false

要注意的是,我上面的demo代码是存在存在并发竞态条件问题的,但是出于性能和简化写法的考虑,这个是可以接受的,毕竟流量限制一般也不需要太精确。