按比例控制流量的一种实现
网关做灰度的时候,要控制流量的比例,比如 3:7 的分发流量到两个不同版本的服务上去。刚开始的想法是每次流量过来生成 100 以内的随机数,随机数落在那个区间就转到那个版本的服务上去,但是发现这样无法较精准的保证 3:7 的比例,因为有可能某段时间内生成的随机数大范围的落在某个区间内,比如请求了 100 次,每次生成的随机数都是大于 30 的,这样 70% 比例的服务就承受了 100% 的流量。
接下来想到了第二种解决方案,能够保证 10(基数) 倍的流量比例正好是 3:7,思路如下:
1、生成 0 - 99 的数组(集合) 2、打乱数组(集合)的顺序,为了防止出现某比例的流量集中出现 3、全局的计数器,要考虑原子性 4、从数组(集合)中取出计数器和 100 取余后位置的值 5、判断取到的值落在那个区间
以下是 Java 的简单实现:
package io.github.ehlxr.rate;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 按比例控制流量
*
* @author ehlxr
* @since 2019-07-19.
*/
public class RateBarrier {
private AtomicInteger op = new AtomicInteger(0);
private List<Integer> source;
private int base;
private int rate;
public boolean allow() {
return source.get(op.incrementAndGet() % base) < rate;
}
private RateBarrier() {
}
public RateBarrier(int base, int rate) {
this.base = base;
this.rate = rate;
source = new ArrayList<>(base);
for (int i = 0; i < base; i++) {
source.add(i);
}
// 打乱集合顺序
Collections.shuffle(source);
}
}
以下是 3:7 流量控制的测试:
package io.github.ehlxr.rate;
/**
* @author ehlxr
* @since 2019-07-19.
*/
public class Main {
public static void main(String[] args) {
RateBarrier rateBarrier = new RateBarrier(10, 3);
final Thread[] threads = new Thread[20];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
if (rateBarrier.allow()) {
System.out.println("this is on 3");
} else {
System.out.println("this is on 7");
}
});
threads[i].start();
}
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// Output:
/*
this is on 7
this is on 7
this is on 7
this is on 7
this is on 3
this is on 7
this is on 3
this is on 7
this is on 7
this is on 3
this is on 7
this is on 7
this is on 7
this is on 7
this is on 3
this is on 7
this is on 3
this is on 7
this is on 7
this is on 3
*/
以下是 Golang 版本 2:3:5 比例分流的简单实现:
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
)
type RateBarrier struct {
source []int
op uint64
base int
}
func NewRateBarrier(base int) *RateBarrier {
source := make([]int, base, base)
for i := 0; i < base; i++ {
source[i] = i
}
// 随机排序
rand.Shuffle(base, func(i, j int) {
source[i], source[j] = source[j], source[i]
})
return &RateBarrier{
source: source,
base: base,
}
}
func (b *RateBarrier) Rate() int {
return b.source[int(atomic.AddUint64(&b.op, 1))%b.base]
}
func main() {
var wg sync.WaitGroup
wg.Add(20)
// 2:3:5
b := NewRateBarrier(10)
for i := 0; i < 20; i++ {
go func() {
rate := b.Rate()
switch {
case rate < 2:
fmt.Println("this is on 20%")
case rate >= 2 && rate < 5:
fmt.Println("this is on 30%")
case rate >= 5:
fmt.Println("this is on 50%")
}
wg.Done()
}()
}
wg.Wait()
}
// Output:
/*
this is on 30%
this is on 50%
this is on 30%
this is on 20%
this is on 50%
this is on 50%
this is on 50%
this is on 20%
this is on 30%
this is on 20%
this is on 50%
this is on 30%
this is on 30%
this is on 50%
this is on 50%
this is on 50%
this is on 20%
this is on 50%
this is on 50%
this is on 30%
*/