回顾与目标:
在之前的实现中, ReverseProxy 函数接收一个固定的 target *url.URL 。
要实现负载均衡,需要:
- 管理一个后端列表(支持多个目标 URL )。
- 根据不同的策略(轮询、随机、最少连接、IP 哈希)从列表中选择一个目标
- 保证在多 goroutine 并发下,选择过程是安全的
一、设计数据结构:
1. 后端节点(Backend)
每个后端需要包含:
URL:后端的地址(比如http://localhost:8001)Alive:是否健康(下一阶段实现,先预留)Connections:当前活跃连接数(用于“最少连接数”算法)
type Backend struct{
URL *url.URL
Alive bool // 是否存活(健康检查用)
Connections int64 // 当前活跃连接数(原子操作)
}
2. 负载均衡器(LoadBalancer)
负载均衡器管理所有后端,并提供选择方法。
type LoadBalancer struct{
backends []*Backend
current uint64 // 轮询用,指向下一个要选择的后端索引
mu sync.RWMutex // 保护 backends 切片(比如动态增删后端)
}
3. 初始化
可以从配置文件或硬编码创建几个后端:
func NewLoadBalancer(urls []string) *LoadBalancer{
var backends []*Backend
for _, u := range urls{
parsed, _ := url.Parse(u)
backends = append(backends, &Backend{
URL: parsed,
Alive: true,
Connections: 0,
})
}
return &LoadBalancer{
backends: backends,
current: 0,
}
}
二、实现负载均衡算法
1. 轮询(Round Robin)
轮询就是按照顺序轮流选择后端。
关键在于并发安全:多个请求同时进来,不能都选同一个。
可以利用原子操作来递增索引。
func (lb *LoadBalancer) NextRoundRobin() *Backend {
lb.mu.RLock() // 加读锁,禁止其他 goroutine 修改 backends
defer lb.mu.RUnlock()
// 注意到:一致性哈希可以避免越界问题,后续进行学习
n := uint64(len(lb.backends))
if n == 0{
return nil
}
// 原子地增加 current ,并取模
next := atomic.AddUint64(&lb.current, 1) % n
return lb.backends[next]
}
2. 随机(Random)
随机选择一个后端。
这里使用 math/rand 来进行随机选择,注意 rand 包默认不是并发安全的,因此这里采用加锁保护 rand 调用。
var rngMu sync.Mutex
func (lb *LoadBalancer) NextRandom() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
n := len(lb.backends)
if n == 0{
return nil
}
rngMu.Lock()
idx := rand.Intn(n)
rngMu.Unlock()
// 更好的做法:使用 rand.New(rand.NewSource(time.Now().UnixNano()))
// 创建一个私有的随机源,并用一个 sync.Mutex 保护它
return lb.backends[idx]
}
3. 最少连接数(Least Connections)
选择当前活跃连接数最小的后端。
活跃连接数 Connections 需要在转发请求时增加,在请求完成后减少。
func (lb *LoadBalancer) NextLeastConnections() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
var selected *Backend
var minConns int64 = 1<<63 - 1
for _, b := range lb.backends {
if !b.Alive {
continue
}
conns := atomic.LoadInt64(&b.Connections)
if conns < minConns {
minConns = conns
selected = b
}
}
return selected
}
4. IP 哈希(IP Hash)
根据客户端 IP 计算哈希值,然后映射到后端。
这样可以保证同一个客户端的请求总是打到同一个后端(如果后端数量不变的话)。
func (lb *LoadBalancer) NextIPHash(ip string) *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
n := len(lb.backends)
if n == 0 {
return nil
}
// 简单的哈希:把 IP 字符串转成整数然后取模
h := fnv.New32a()
h.Write([]byte(ip))
hash := h.Sum32()
idx := int(hash) % n
return lb.backends[idx]
}
注意:IP 哈希需要在反向代理处理器中获取客户端 IP 。可以从 r.RemoteAddr 拿到,需要分割出 IP 。如果有 X-Forwarded-For 头,可能还需要处理代理链的情况,这里的处理先直接使用 r.RemoteAddr 。
三、集成到 ReverseProxy
现在修改 ReverseProxy 函数,让它不再接收单个 target ,而是接收一个负载均衡器实例,并根据策略选择后端。
func ReverseProxy(lb *LoadBalancer, strategy string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 1. 根据策略选择后端
var backend *Backend
switch strategy{
case "round-robin":
backend = lb.NextRoundRobin()
case "random":
backend = lb.NextRandom()
case "least-conn":
backend = lb.NextLeastConnections()
case "ip-hash":
ip, _, _ := net.SplitHostPort(r.RemoteAddr)
backend = lb.NextIPHash(ip)
default:
http.Error(w, "unknown strategy", http.StatusInternalServerError)
return
}
if backend == nil {
http.Error(w, "no available backend", http.StatusServiceUnavailable)
return
}
// 2. 增加连接数(用于最少连接数策略)
atomic.AddInt64(&backend.Connections, 1)
// 在函数返回前减少连接数
defer atomic.AddInt64(&backend.Connections, -1)
// 3. 构建转发请求(与之前相同,但 target 换成 backend.URL)
proxyURL := backend.URL.ResolveReference(r.URL)
req, err := http.NewRequest(r.Method, proxyURL.String(), r.Body)
if err != nil{
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header = r.Header.Clone()
req.Host = backend.URL.Host
// 4. 发送请求
client := http.DefaultClient
resp, err := client.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
// 5. 写回响应(注意顺序:先 Header,再 WriteHeader)
for key, values := range resp.Header{
for _, value := range values{
w.Header().Add(key, value)
}
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
}
四、并发安全问题讨论
这里的 lb.mu 读写锁主要保护 backends 切片本身,防止在遍历时切片被修改(比如健康检查动态移除后端)。目前我们还没实现动态更新,但先加上,为后续准备。
五、完整示例代码
为了方便测试,可以在 main 函数中创建几个测试后端地址,然后启动代理。
代码如下:
package main
import (
"hash/fnv"
"io"
"log"
"math/rand"
"net"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
)
type Backend struct{
URL *url.URL
Alive bool // 是否存活(健康检查用)
Connections int64 // 当前活跃连接数(原子操作)
}
type LoadBalancer struct{
backends []*Backend
current uint64 // 轮询用,指向下一个要选择的后端索引
mu sync.RWMutex // 保护 backends 切片(比如动态增删后端)
}
// 初始化
func NewLoadBalancer(urls []string) *LoadBalancer{
var backends []*Backend
for _, u := range urls{
parsed, _ := url.Parse(u)
backends = append(backends, &Backend{
URL: parsed,
Alive: true,
Connections: 0,
})
}
return &LoadBalancer{
backends: backends,
current: 0,
}
}
// 实现负载均衡算法
// 1. 轮询
func (lb *LoadBalancer) NextRoundRobin() *Backend {
lb.mu.RLock() // 加读锁,禁止其他 goroutine 修改 backends
defer lb.mu.RUnlock()
// 注意到:一致性哈希可以避免越界问题,后续进行学习
n := uint64(len(lb.backends))
if n == 0{
return nil
}
// 原子地增加 current ,并取模
next := atomic.AddUint64(&lb.current, 1) % n
return lb.backends[next]
}
// 2. 随机
var rngMu sync.Mutex
func (lb *LoadBalancer) NextRandom() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
n := len(lb.backends)
if n == 0{
return nil
}
rngMu.Lock()
idx := rand.Intn(n)
rngMu.Unlock()
// 更好的做法:使用 rand.New(rand.NewSource(time.Now().UnixNano()))
// 创建一个私有的随机源,并用一个 sync.Mutex 保护它
return lb.backends[idx]
}
// 3. 最少连接数
func (lb *LoadBalancer) NextLeastConnections() *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
var selected *Backend
var minConns int64 = 1<<63 - 1
for _, b := range lb.backends {
if !b.Alive {
continue
}
conns := atomic.LoadInt64(&b.Connections)
if conns < minConns {
minConns = conns
selected = b
}
}
return selected
}
// 4. IP Hash
func (lb *LoadBalancer) NextIPHash(ip string) *Backend {
lb.mu.RLock()
defer lb.mu.RUnlock()
n := len(lb.backends)
if n == 0 {
return nil
}
// 简单的哈希:把 IP 字符串转成整数然后取模
h := fnv.New32a()
h.Write([]byte(ip))
hash := h.Sum32()
idx := int(hash) % n
return lb.backends[idx]
}
func ReverseProxy(lb *LoadBalancer, strategy string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 1. 根据策略选择后端
var backend *Backend
switch strategy{
case "round-robin":
backend = lb.NextRoundRobin()
case "random":
backend = lb.NextRandom()
case "least-conn":
backend = lb.NextLeastConnections()
case "ip-hash":
ip, _, _ := net.SplitHostPort(r.RemoteAddr)
backend = lb.NextIPHash(ip)
default:
http.Error(w, "unknown strategy", http.StatusInternalServerError)
return
}
if backend == nil {
http.Error(w, "no available backend", http.StatusServiceUnavailable)
return
}
// 2. 增加连接数(用于最少连接数策略)
atomic.AddInt64(&backend.Connections, 1)
// 在函数返回前减少连接数
defer atomic.AddInt64(&backend.Connections, -1)
// 3. 构建转发请求(与之前相同,但 target 换成 backend.URL)
proxyURL := backend.URL.ResolveReference(r.URL)
req, err := http.NewRequest(r.Method, proxyURL.String(), r.Body)
if err != nil{
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
req.Header = r.Header.Clone()
req.Host = backend.URL.Host
// 4. 发送请求
client := http.DefaultClient
resp, err := client.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
// 5. 写回响应(注意顺序:先 Header,再 WriteHeader)
for key, values := range resp.Header{
for _, value := range values{
w.Header().Add(key, value)
}
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
}
func main() {
backendURLs := []string {
"http://127.0.0.1:8001",
"http://127.0.0.1:8002",
"http://127.0.0.1:8003",
}
lb := NewLoadBalancer(backendURLs)
strategy := "round-robin"
http.HandleFunc("/", ReverseProxy(lb, strategy))
log.Println("Starting proxy server on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}