很多场景下都需要流量控制,从底层的数据包传输到购物秒杀。
常见的流量控制算法有两种,一个是漏桶算法,桶本身具有一个恒定的速率往下流水,而上方一直有水进入桶中。当桶还未满时,上方的水可以加入。一旦水满,上方的水就溢出,可以认为请求被丢弃。
这个算法不难理解,可以保证系统的稳定,因为水流出的速度一直是不变的,即使在有突发流量的情况下。但是如果到了网站秒杀的情境下,有突发流量,这个算法就不好用了。更常用的是令牌通算法。
令牌桶还是可以认为是一个盛满了令牌的箱子,每处理一个请求就会拿走一个令牌,如果令牌没有了,请求就会失败。而根据速率限制,令牌还是一直在添加的,所以还是有一部分的请求是可以慢慢的拿到令牌的。而一开始箱子里面是有一些令牌的,一开始的突发流量都可以成功的拿到令牌。
根据 https://github.com/celery/kombu/blob/master/kombu/utils/limits.py , 我简单改写了一下,可以很直观的看出来流量控制结果。
# coding=utf-8
import time
class TokenBucket(object):
def __init__(self, fill_rate, capacity):
self.capacity = float(capacity)
self._left_tokens = capacity
self.fill_rate = float(fill_rate)
self.timestamp = time.time()
def consume(self, tokens=1):
"""Return :const:`True` if the number of tokens can be consumed
from the bucket. If they can be consumed, a call will also consume the
requested number of tokens from the bucket. Calls will only consume
`tokens` (the number requested) or zero tokens -- it will never consume
a partial number of tokens."""
if tokens <= self.tokens:
self._left_tokens -= tokens
return True
return False
def expected_time(self, tokens=1):
"""Return the time (in seconds) when a new token is expected
to be available. This will not consume any tokens from the bucket."""
_tokens = self.tokens
tokens = max(tokens, _tokens)
return (tokens - _tokens) / self.fill_rate
@property
def tokens(self):
if self._left_tokens < self.capacity:
now = time.time()
delta = self.fill_rate * (now - self.timestamp)
self._left_tokens = min(self.capacity, self._left_tokens + delta)
self.timestamp = now
return self._left_tokens
true = false = 0
# 每秒往桶里面添加5个令牌,桶的大小是50
bucket = TokenBucket(fill_rate=5, capacity=50)
for i in range(300):
time.sleep(0.1)
if bucket.consume():
true += 1
print "Accepted", i
else:
false += 1
print "Dropped, time left ", bucket.expected_time()
print true, false
要注意的是,我上面的demo代码是存在存在并发竞态条件问题的,但是出于性能和简化写法的考虑,这个是可以接受的,毕竟流量限制一般也不需要太精确。