|
2019-05-17
如上图所示:X轴表示令牌桶中的令牌数量,y轴表示生产一个令牌需要的时间(秒);相关参数如下:
2. 换算关系
count:已知由用户设置,eg:每秒允许通过100请求。 warmUpPeriodInSec:已知由用户设置,默认10秒,时间区域上(2)梯形区域coldFactor,已经默认3。
公式1:
stableInterval=1/count(稳定生产一个令牌需要时间)
公式2:
coldInterval=stableInterval*coldFactor冷启动因子(生产一个令牌需要的最长时长)
公式3:
前提:由于coldFactor默认为3,y轴stableInterval~coldInterval的距离是0~stableInterval的距离两倍,时间区域上红色(2)梯形区域是红色1的长方形区域的两倍
坐标时间(1)长方形区域面积=长(thresholdPermits(warningToken))*宽(stableInterval)
公式4:
坐标时间(1)长方形区域面积=0.5*warmUpPeriodSec
公式5:
(thresholdPermits(warningToken))=0.5*warmUpPeriodSec/stableInterval
代码:
warningToken=(int)(warmUpPeriodInSec*count)/(coldFactor-1);
公式6:
前提:梯形的面积=(上低+下低)*高/2推导出maxPermits(maxToken)值
maxPermits=2*warmUpPeriodSec/(stableInterval+coldInterval)+thresholdPermits(warningToken)
代码:
maxToken=warningToken+(int)(2*warmUpPeriodInSec*count/(1.0+coldFactor));
公式7:
前提:斜率公式=(y1-y2)/(x1-x2)
slope=(coldInterval-stableInterval)/(maxToken-warningToken)
代码:
slope=(coldFactor-1.0)/count/(maxToken-warningToken);
3. 原理
当令牌桶中的令牌 < thresholdPermits(warningToken)时,令牌按照固定速率生产,请求流量稳定。
当令牌 > thresholdPermits(warningToken)时,开启预热,生产的令牌的速率 < 令牌滑落的速度,一段时间后,令牌 <= thresholdPermits(warningToken),请求回归到稳定状态,预热结束。
4.WarmUpControlle
private void construct(double count, int warmUpPeriodInSec,int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; // (1) thresholdPermits = 0.5 * warmupPeriod / stableInterval. // warningToken = 100; warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1); // (2) maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval) // maxToken = 200 maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); slope = (coldFactor - 1.0) / count / (maxToken - warningToken); } public boolean canPass(Node node, int acquireCount, boolean prioritized) { long passQps = (long) node.passQps(); long previousQps = (long) node.previousPassQps(); // (1) 令牌的生产过程和滑落过程 syncToken(previousQps); // 开始计算它的斜率 // 如果进入了警戒线,开始调整他的qps long restToken = storedTokens.get(); // (2) 令牌桶中剩余的令牌数 if (restToken >= warningToken) { // (3-1) 当剩余令牌大于令牌桶阈值,开启预热 long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope+1/count // (4)根据斜率计算出预热时的QPS double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { //(5) 判断是否放行 return true; } } else { // (3-2) 剩余令牌小于令牌桶阈值, 不开启预热,流量稳定通行 if (passQps + acquireCount <= count) { return true; } } return false; } protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); // 去除尾数即秒数 currentTime = currentTime - currentTime % 1000; // 上次记录的时间戳 long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; } // 获取桶中令牌数 long oldValue = storedTokens.get(); // 令牌生产 long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { // 2 将生产的令牌放入令牌桶 long currentValue = storedTokens.addAndGet(0 -passQps); // 从令牌桶中滑落已使用的令牌 if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } } private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 添加令牌的判断前提条件: // 当令牌的消耗程度远远低于警戒线的时候 if (oldValue < warningToken) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { if (passQps < (int)count / coldFactor) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); } 排队等待 -
匀速排队方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过;对应的是漏桶算法。
这效果只针对QPS流控,并发线程流控不支持。
1. 匀速排队
有超时等待时间,一旦超过这个预定设置的时间将会被限流。
2. 漏桶算法(leakyBucket)
随机突发流量通过漏桶以后稳定的速率流出,起到流量控制和平滑作用。
3. 实现类
RateLimiterController:通过控制请求通过的时间间隔来实现达到匀速目的。
public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); // Calculate the interval between every two requests. // 1) 两次请求的时间间隔 long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // Expected pass time of this request. // 2)计算这次请求的通过预留时间 = 上次请求通过时间+时间间隔 long expectedTime = costTime + latestPassedTime.get(); // 3)当预期时间 <= 当前时间, 则允许通过并更新上次请求时间戳 if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. latestPassedTime.set(currentTime); return true; } else { // Calculate the time to wait. // 4)当预期时间 > 当前时间, 则需要等待; 计算需要等待时间 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); // 5)需要等待的时间> 最⼤队列时间, 则拒绝 默认超时时间500毫秒 if (waitTime > maxQueueingTimeMs) { return false; } else { long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); // 6) if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // in race condition waitTime may <= 0 // 7) if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; }
说明:假设设置的阈值count=100即每秒允许100个请求,每次通过一个请求acquireCount=1,套入公式costTime=10,即两次请求的时间间隔为10秒。
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002