基于Golang实现反向代理服务器(2)负载均衡算法

回顾与目标:

在之前的实现中, ReverseProxy 函数接收一个固定的 target *url.URL 。

要实现负载均衡,需要:

  1. 管理一个后端列表(支持多个目标 URL )。
  2. 根据不同的策略(轮询、随机、最少连接、IP 哈希)从列表中选择一个目标
  3. 保证在多 goroutine 并发下,选择过程是安全

一、设计数据结构:

1. 后端节点(Backend)

每个后端需要包含:

  • URL:后端的地址(比如 http://localhost:8001
  • Alive:是否健康(下一阶段实现,先预留)
  • Connections:当前活跃连接数(用于“最少连接数”算法)
type Backend struct{
	URL *url.URL
	Alive bool // 是否存活(健康检查用)
	Connections int64 // 当前活跃连接数(原子操作)
}

2. 负载均衡器(LoadBalancer)

负载均衡器管理所有后端,并提供选择方法。

type LoadBalancer struct{
	backends []*Backend
	current uint64 // 轮询用,指向下一个要选择的后端索引
	mu sync.RWMutex // 保护 backends 切片(比如动态增删后端)
}

3. 初始化

可以从配置文件或硬编码创建几个后端:

func NewLoadBalancer(urls []string) *LoadBalancer{
	var backends []*Backend
	for _, u := range urls{
		parsed, _ := url.Parse(u)
		backends = append(backends, &Backend{
			URL: parsed,
			Alive: true,
			Connections: 0,
		})
	}
	return &LoadBalancer{
		backends: backends,
		current: 0,
	}
}

二、实现负载均衡算法

1. 轮询(Round Robin)

轮询就是按照顺序轮流选择后端。

关键在于并发安全:多个请求同时进来,不能都选同一个。

可以利用原子操作来递增索引。

func (lb *LoadBalancer) NextRoundRobin() *Backend {
	lb.mu.RLock() // 加读锁,禁止其他 goroutine 修改 backends
    defer lb.mu.RUnlock()
	// 注意到:一致性哈希可以避免越界问题,后续进行学习

	n := uint64(len(lb.backends))
	if n == 0{
		return nil
	}

	// 原子地增加 current ,并取模
	next := atomic.AddUint64(&lb.current, 1) % n

	return lb.backends[next]
}

2. 随机(Random)

随机选择一个后端。

这里使用 math/rand 来进行随机选择,注意 rand 包默认不是并发安全的,因此这里采用加锁保护 rand 调用。

var rngMu sync.Mutex
func (lb *LoadBalancer) NextRandom() *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	n := len(lb.backends)
	if n == 0{
		return nil
	}

	rngMu.Lock()
	idx := rand.Intn(n)
	rngMu.Unlock()

	// 更好的做法:使用 rand.New(rand.NewSource(time.Now().UnixNano())) 
	// 创建一个私有的随机源,并用一个 sync.Mutex 保护它

	return lb.backends[idx]
}

3. 最少连接数(Least Connections)

选择当前活跃连接数最小的后端。

活跃连接数 Connections 需要在转发请求时增加,在请求完成后减少。

func (lb *LoadBalancer) NextLeastConnections() *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	var selected *Backend
	var minConns int64 = 1<<63 - 1

	for _, b := range lb.backends {
		if !b.Alive {
			continue
		}

		conns := atomic.LoadInt64(&b.Connections)
		if conns < minConns {
			minConns = conns
			selected = b
		}
	}

	return selected
}

4. IP 哈希(IP Hash)

根据客户端 IP 计算哈希值,然后映射到后端。

这样可以保证同一个客户端的请求总是打到同一个后端(如果后端数量不变的话)。

func (lb *LoadBalancer) NextIPHash(ip string) *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()



	n := len(lb.backends)
	if n == 0 {
		return nil
	}

	// 简单的哈希:把 IP 字符串转成整数然后取模
	h := fnv.New32a()
	h.Write([]byte(ip))
	hash := h.Sum32()
	idx := int(hash) % n
	return lb.backends[idx]
}

注意:IP 哈希需要在反向代理处理器中获取客户端 IP 。可以从 r.RemoteAddr 拿到,需要分割出 IP 。如果有 X-Forwarded-For 头,可能还需要处理代理链的情况,这里的处理先直接使用 r.RemoteAddr 。

三、集成到 ReverseProxy

现在修改 ReverseProxy 函数,让它不再接收单个 target ,而是接收一个负载均衡器实例,并根据策略选择后端。

func ReverseProxy(lb *LoadBalancer, strategy string) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		// 1. 根据策略选择后端
		var backend *Backend
		switch strategy{
		case "round-robin":
			backend = lb.NextRoundRobin()
		case "random":
			backend = lb.NextRandom()
		case "least-conn":
			backend = lb.NextLeastConnections()
		case "ip-hash":
			ip, _, _ := net.SplitHostPort(r.RemoteAddr)
			backend = lb.NextIPHash(ip)
		default:
			http.Error(w, "unknown strategy", http.StatusInternalServerError)
			return
		}

		if backend == nil {
			http.Error(w, "no available backend", http.StatusServiceUnavailable)
			return
		}

		// 2. 增加连接数(用于最少连接数策略)
		atomic.AddInt64(&backend.Connections, 1)
		// 在函数返回前减少连接数
		defer atomic.AddInt64(&backend.Connections, -1)

		// 3. 构建转发请求(与之前相同,但 target 换成 backend.URL)
		proxyURL := backend.URL.ResolveReference(r.URL)
		req, err := http.NewRequest(r.Method, proxyURL.String(), r.Body)
		if err != nil{
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}

		req.Header = r.Header.Clone()
		req.Host = backend.URL.Host

		// 4. 发送请求
		client := http.DefaultClient
		resp, err := client.Do(req)
		if err != nil {
			http.Error(w, err.Error(), http.StatusBadGateway)
			return
		}
		defer resp.Body.Close()

		// 5. 写回响应(注意顺序:先 Header,再 WriteHeader)
		for key, values := range resp.Header{
			for _, value := range values{
				w.Header().Add(key, value)
			}
		}
		w.WriteHeader(resp.StatusCode)
		io.Copy(w, resp.Body)
	}
}

四、并发安全问题讨论

这里的 lb.mu 读写锁主要保护 backends 切片本身,防止在遍历时切片被修改(比如健康检查动态移除后端)。目前我们还没实现动态更新,但先加上,为后续准备。

五、完整示例代码

为了方便测试,可以在 main 函数中创建几个测试后端地址,然后启动代理。

代码如下:

package main

import (
	"hash/fnv"
	"io"
	"log"
	"math/rand"
	"net"
	"net/http"
	"net/url"
	"sync"
	"sync/atomic"
	"time"
)

type Backend struct{
	URL *url.URL
	Alive bool // 是否存活(健康检查用)
	Connections int64 // 当前活跃连接数(原子操作)
}

type LoadBalancer struct{
	backends []*Backend
	current uint64 // 轮询用,指向下一个要选择的后端索引
	mu sync.RWMutex // 保护 backends 切片(比如动态增删后端)
}

// 初始化
func NewLoadBalancer(urls []string) *LoadBalancer{
	var backends []*Backend
	for _, u := range urls{
		parsed, _ := url.Parse(u)
		backends = append(backends, &Backend{
			URL: parsed,
			Alive: true,
			Connections: 0,
		})
	}
	return &LoadBalancer{
		backends: backends,
		current: 0,
	}
}

// 实现负载均衡算法

// 1. 轮询
func (lb *LoadBalancer) NextRoundRobin() *Backend {
	lb.mu.RLock() // 加读锁,禁止其他 goroutine 修改 backends
    defer lb.mu.RUnlock()
	// 注意到:一致性哈希可以避免越界问题,后续进行学习

	n := uint64(len(lb.backends))
	if n == 0{
		return nil
	}

	// 原子地增加 current ,并取模
	next := atomic.AddUint64(&lb.current, 1) % n

	return lb.backends[next]
}

// 2. 随机
var rngMu sync.Mutex
func (lb *LoadBalancer) NextRandom() *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	n := len(lb.backends)
	if n == 0{
		return nil
	}

	rngMu.Lock()
	idx := rand.Intn(n)
	rngMu.Unlock()

	// 更好的做法:使用 rand.New(rand.NewSource(time.Now().UnixNano())) 
	// 创建一个私有的随机源,并用一个 sync.Mutex 保护它

	return lb.backends[idx]
}

// 3. 最少连接数
func (lb *LoadBalancer) NextLeastConnections() *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()

	var selected *Backend
	var minConns int64 = 1<<63 - 1

	for _, b := range lb.backends {
		if !b.Alive {
			continue
		}

		conns := atomic.LoadInt64(&b.Connections)
		if conns < minConns {
			minConns = conns
			selected = b
		}
	}

	return selected
}

// 4. IP Hash
func (lb *LoadBalancer) NextIPHash(ip string) *Backend {
	lb.mu.RLock()
	defer lb.mu.RUnlock()



	n := len(lb.backends)
	if n == 0 {
		return nil
	}

	// 简单的哈希:把 IP 字符串转成整数然后取模
	h := fnv.New32a()
	h.Write([]byte(ip))
	hash := h.Sum32()
	idx := int(hash) % n
	return lb.backends[idx]
}

func ReverseProxy(lb *LoadBalancer, strategy string) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		// 1. 根据策略选择后端
		var backend *Backend
		switch strategy{
		case "round-robin":
			backend = lb.NextRoundRobin()
		case "random":
			backend = lb.NextRandom()
		case "least-conn":
			backend = lb.NextLeastConnections()
		case "ip-hash":
			ip, _, _ := net.SplitHostPort(r.RemoteAddr)
			backend = lb.NextIPHash(ip)
		default:
			http.Error(w, "unknown strategy", http.StatusInternalServerError)
			return
		}

		if backend == nil {
			http.Error(w, "no available backend", http.StatusServiceUnavailable)
			return
		}

		// 2. 增加连接数(用于最少连接数策略)
		atomic.AddInt64(&backend.Connections, 1)
		// 在函数返回前减少连接数
		defer atomic.AddInt64(&backend.Connections, -1)

		// 3. 构建转发请求(与之前相同,但 target 换成 backend.URL)
		proxyURL := backend.URL.ResolveReference(r.URL)
		req, err := http.NewRequest(r.Method, proxyURL.String(), r.Body)
		if err != nil{
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}

		req.Header = r.Header.Clone()
		req.Host = backend.URL.Host

		// 4. 发送请求
		client := http.DefaultClient
		resp, err := client.Do(req)
		if err != nil {
			http.Error(w, err.Error(), http.StatusBadGateway)
			return
		}
		defer resp.Body.Close()

		// 5. 写回响应(注意顺序:先 Header,再 WriteHeader)
		for key, values := range resp.Header{
			for _, value := range values{
				w.Header().Add(key, value)
			}
		}
		w.WriteHeader(resp.StatusCode)
		io.Copy(w, resp.Body)
	}
}

func main() {
	backendURLs := []string {
		"http://127.0.0.1:8001",
        "http://127.0.0.1:8002",
        "http://127.0.0.1:8003",
	}
	lb := NewLoadBalancer(backendURLs)

	strategy := "round-robin"

	http.HandleFunc("/", ReverseProxy(lb, strategy))

	log.Println("Starting proxy server on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

发表评论