Go开发企业级微服务网关(6)ReverseProxy

之前的简易版 HTTP 代理不具备下面这些功能:

  • 错误回调及错误日志处理
  • 更改代理返回内容
  • 负载均衡
  • url 重写
  • 限流、熔断、降级
  • 数据统计
  • 权限验证

大纲:用 ReverseProxy 实现一个 HTTP 代理

  • ReverseProxy 功能点
  • ReverseProxy 示例
  • ReverseProxy 源码分析

拓展 ReverseProxy 功能:

  • 4种负载轮询类型实现以及接口封装
  • 拓展中间件支持:限流、熔断实现、权限、数据统计

ReverseProxy 功能点

  • 更改内容支持
  • 错误信息回调
  • 支持自定义负载均衡
  • url 重写功能
  • 连接池功能
  • 支持 websocket 服务
  • 支持 https 代理

用 ReverseProxy 实现一个 HTTP 代理

代码如下:

package main

import (
	"log"
	"net/http"
	"net/http/httputil"
	"net/url"
)

var addr = "127.0.0.1:2002"

func main() {
	rs1 := "http://127.0.0.1:2003/base"
	url1, err1 := url.Parse(rs1)
	if err1 != nil {
		log.Println(err1)
	}
	proxy := httputil.NewSingleHostReverseProxy(url1)
	log.Println("Starting httpserver at " + addr)
	log.Fatal(http.ListenAndServe(addr, proxy))
}

ReverseProxy 更改内容支持

	// ModifyResponse is an optional function that modifies the
	// Response from the backend. It is called if the backend
	// returns a response at all, with any HTTP status code.
	// If the backend is unreachable, the optional ErrorHandler is
	// called without any call to ModifyResponse.
	//
	// Hop-by-hop headers are removed from the response before
	// calling ModifyResponse. ModifyResponse may need to remove
	// additional headers to fit its deployment model, such as Alt-Svc.
	//
	// If ModifyResponse returns an error, ErrorHandler is called
	// with its error value. If ErrorHandler is nil, its default
	// implementation is used.
	ModifyResponse func(*http.Response) error

NewSingleHostReverseProxy 部分实现的功能如下:

func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
	director := func(req *http.Request) {
		rewriteRequestURL(req, target)
	}
	return &ReverseProxy{Director: director}
}

func rewriteRequestURL(req *http.Request, target *url.URL) {、
	// http:127.0.0.1:2002/dir?name=123
	// RawQuery: name=123
	// Scheme: http
	// Host: 127.0.0.1:2002
	targetQuery := target.RawQuery
	req.URL.Scheme = target.Scheme
	req.URL.Host = target.Host
	req.URL.Path, req.URL.RawPath = joinURLPath(target, req.URL)
	if targetQuery == "" || req.URL.RawQuery == "" {
		req.URL.RawQuery = targetQuery + req.URL.RawQuery
	} else {
		req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
	}
}

当我们希望对后端返回的响应内容进行修改时,可以实现自定义的 modifyResponse ,随后通过创建自定义的 ReverseProxy 而不是使用 NewSingleHostReverseProxy 来实现自定义函数的挂载,实现代码如下:

package main

import (
	"bytes"
	"io"
	"log"
	"net/http"
	"net/http/httputil"
	"net/url"
	"strconv"
)

var addr = "127.0.0.1:2002"

func main() {
	rs1 := "http://127.0.0.1:2003/base"
	url1, err1 := url.Parse(rs1)
	if err1 != nil {
		log.Println(err1)
	}
	// proxy := httputil.NewSingleHostReverseProxy(url1)
	proxy := &httputil.ReverseProxy{
		Rewrite: func(r *httputil.ProxyRequest) {
			r.SetURL(url1)
			r.Out.Host = r.In.Host
		},
		ModifyResponse: modifyResponse,
	}

	log.Println("Starting httpserver at " + addr)
	log.Fatal(http.ListenAndServe(addr, proxy))
}

func modifyResponse(resp *http.Response) error {
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return nil
	}

	bodyBytes, err := io.ReadAll(resp.Body)
	if err != nil {
		return err
	}

	resp.Body.Close()

	modifiedBody := append([]byte("Modified: "), bodyBytes...)
	resp.Body = io.NopCloser(bytes.NewReader(modifiedBody))
	resp.ContentLength = int64(len(modifiedBody))
	resp.Header.Set("Content-Length", strconv.Itoa(len(modifiedBody)))

	return nil
}

ReverseProxy 特殊 Header

Connection

标记请求发起方与第一代理的状态
决定当前事务完成后,是否会关闭网络

  • Connection: keep-alive 不关闭网络
  • Connection: close 关闭网络
  • Connection: upgrade 协议升级

TE 、Trailer

  • TE 是 request_header,表示希望使用的传输编码类型

如:TE: trailers, deflate; q=0.5 表示,期望在采用分块传输编码响应中接收挂载字段, zlib 编码,0.5优先级排序

  • Trailer 是 response_header,允许发送方在消息后面添加额外的元信息

如:Trailer: Expires 表示,Expires 将出现在分块信息的结尾

第一代理除去标准的逐段传输头(hop-by-hop)

  • 逐段传输头都需要在 Connection 头中列出
  • 第一个代理知道必须处理它们且不转发它们
  • 逐段传输头:Keep-Alive, Transfer-Encoding, TE, Connection, Trailer, Upgrade, Proxy-Authorization, Proxy-Authenticate

ReverseProxy 补充知识——特殊 StatusCode

  • 100 表示目前一切正常,客户端可以继续请求
  • 101 表示服务端发送给客户端升级协议的请求

100-continue

客户端要 POST 的数据大于 1024 字节的时候,客户端不会直接发起 POST 请求,而是会分两步:

  1. 发送一个请求,包含一个 Expect:100-continue,询问 Server 是否愿意接收数据
  2. 接收到 Server 返回的 100-continue 应答以后,返回 100 状态,才把数据 POST 给 Server

101-Switching Protocols

ReverseProxy 源码分析

核心结构体代码如下:

// ReverseProxy 是一个反向代理处理器,将请求转发给后端服务器,并将响应返回给客户端。
type ReverseProxy struct {
	// Rewrite 修改请求,生成发往后端的新请求。不能与 Director 同时设置。
	// 推荐使用 Rewrite,它更可控。
	Rewrite func(*ProxyRequest)

	// Director 修改请求,功能同 Rewrite,是旧版本的方式。不能与 Rewrite 同时设置。
	Director func(*http.Request)

	// Transport 执行代理请求,默认使用 http.DefaultTransport。
	Transport http.RoundTripper

	// FlushInterval 控制响应刷新的间隔。0 表示不主动刷新,负数表示每次写入后立即刷新。
	FlushInterval time.Duration

	// ErrorLog 记录代理过程中的错误,默认使用标准日志包。
	ErrorLog *log.Logger

	// BufferPool 用于复用缓冲区,提升性能。
	BufferPool BufferPool

	// ModifyResponse 可修改后端返回的响应(任意状态码)。若返回错误,则调用 ErrorHandler。
	ModifyResponse func(*http.Response) error

	// ErrorHandler 处理后端不可达或 ModifyResponse 返回的错误。默认返回 502 错误。
	ErrorHandler func(http.ResponseWriter, *http.Request, error)
}

ServeHTTP 核心逻辑:

// ServeHTTP 实现了 http.Handler 接口,处理反向代理的核心逻辑。
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
	// 确定使用的传输层(Transport)
	transport := p.Transport
	if transport == nil {
		transport = http.DefaultTransport
	}

	ctx := req.Context()
	// 处理客户端连接关闭的通知(仅当请求上下文没有取消信号时)
	if ctx.Done() != nil {
		// 如果请求上下文已包含取消信号(如超时、主动取消),
		// 则不需要再额外监听 CloseNotify(它已被上下文取代)。
		// 这里不做任何操作。
	} else if cn, ok := rw.(http.CloseNotifier); ok {
		// 如果响应器支持 CloseNotifier(即可以感知客户端连接关闭),
		// 则创建一个可取消的上下文,并在单独的 goroutine 中监听关闭通知。
		var cancel context.CancelFunc
		ctx, cancel = context.WithCancel(ctx)
		defer cancel()
		notifyChan := cn.CloseNotify()
		go func() {
			select {
			case <-notifyChan:
				cancel() // 客户端连接关闭,取消上下文
			case <-ctx.Done():
				// 上下文已被取消(例如请求处理完成),退出 goroutine
			}
		}()
	}

	// 克隆原始请求,以便后续修改(如修改 URL、Header 等),同时绑定新的上下文 ctx。
	outreq := req.Clone(ctx)
	if req.ContentLength == 0 {
		// 如果原始请求内容长度为 0,将出站请求的 Body 设为 nil,
		// 避免 http.Transport 重试时出现问题(参见 Issue 16036)。
		outreq.Body = nil
	}
	if outreq.Body != nil {
		// 确保出站请求的 Body 最终被关闭,防止 goroutine 泄露。
		// 注意:RoundTrip 可能在处理程序返回后仍在读取 Body,
		// 关闭 Body 可以避免崩溃(参见 Issue 46866)。
		defer outreq.Body.Close()
	}
	if outreq.Header == nil {
		// 保证 Header 不为 nil,保持历史行为(参见 Issue 33142)
		outreq.Header = make(http.Header)
	}

	// 检查是否同时设置了 Director 和 Rewrite(两者互斥)
	if (p.Director != nil) == (p.Rewrite != nil) {
		p.getErrorHandler()(rw, req, errors.New("ReverseProxy 必须且只能设置 Director 或 Rewrite 中的一个"))
		return
	}

	// 如果使用 Director 方式,调用它修改请求
	if p.Director != nil {
		p.Director(outreq)
		if outreq.Form != nil {
			// 清理无法解析的查询参数
			outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
		}
	}
	outreq.Close = false // 禁用出站请求的 Close 标志(让 Transport 自行决定是否重用连接)

	// 检查是否为协议升级请求(如 WebSocket)
	reqUpType := upgradeType(outreq.Header)
	if !ascii.IsPrint(reqUpType) {
		// 协议名称必须是可打印字符,否则拒绝
		p.getErrorHandler()(rw, req, fmt.Errorf("客户端尝试切换到无效协议 %q", reqUpType))
		return
	}
	// 移除逐跳头部(hop-by-hop headers),如 Connection、Keep-Alive 等
	removeHopByHopHeaders(outreq.Header)

	// 如果原始请求中声明支持 trailer,则在出站请求中也声明支持(参见 Issue 21096)
	if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
		outreq.Header.Set("Te", "trailers")
	}

	// 如果是协议升级请求,重新添加必要的 Connection 和 Upgrade 头部
	if reqUpType != "" {
		outreq.Header.Set("Connection", "Upgrade")
		outreq.Header.Set("Upgrade", reqUpType)
	}

	// 如果使用 Rewrite 方式
	if p.Rewrite != nil {
		// 移除客户端可能提供的转发头部,让 Rewrite 函数重新设置(如果需要)
		outreq.Header.Del("Forwarded")
		outreq.Header.Del("X-Forwarded-For")
		outreq.Header.Del("X-Forwarded-Host")
		outreq.Header.Del("X-Forwarded-Proto")

		// 清理无法解析的查询参数
		outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)

		// 构造 ProxyRequest 并调用 Rewrite 函数
		pr := &ProxyRequest{
			In:  req,
			Out: outreq,
		}
		p.Rewrite(pr)
		outreq = pr.Out // 使用 Rewrite 可能返回修改后的新请求
	} else {
		// 使用 Director 方式时的默认行为:设置 X-Forwarded-For 头部
		if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
			// 如果出站请求中已存在 X-Forwarded-For 头部,则将当前客户端 IP 追加到后面
			prior, ok := outreq.Header["X-Forwarded-For"]
			omit := ok && prior == nil // Issue 38079:nil 值表示不要设置该头部
			if len(prior) > 0 {
				clientIP = strings.Join(prior, ", ") + ", " + clientIP
			}
			if !omit {
				outreq.Header.Set("X-Forwarded-For", clientIP)
			}
		}
	}

	// 如果出站请求没有设置 User-Agent,则设为空字符串(避免使用默认的 Go User-Agent)
	if _, ok := outreq.Header["User-Agent"]; !ok {
		outreq.Header.Set("User-Agent", "")
	}

	// 用于跟踪 1xx 响应(如 100 Continue)的同步机制
	var (
		roundTripMutex sync.Mutex
		roundTripDone  bool
	)
	trace := &httptrace.ClientTrace{
		Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
			roundTripMutex.Lock()
			defer roundTripMutex.Unlock()
			if roundTripDone {
				// 如果 RoundTrip 已经完成,不再修改响应头
				return nil
			}
			h := rw.Header()
			copyHeader(h, http.Header(header))
			rw.WriteHeader(code)

			// 对于 1xx 响应,WriteHeader 不会自动清除之前的头部,需要手动清除
			clear(h)
			return nil
		},
	}
	// 将追踪对象绑定到出站请求的上下文中
	outreq = outreq.WithContext(httptrace.WithClientTrace(outreq.Context(), trace))

	// 使用 Transport 发送请求,获得后端响应
	res, err := transport.RoundTrip(outreq)
	roundTripMutex.Lock()
	roundTripDone = true
	roundTripMutex.Unlock()
	if err != nil {
		// 发生错误(如后端不可达),调用错误处理器
		p.getErrorHandler()(rw, outreq, err)
		return
	}

	// 处理协议升级响应(101 Switching Protocols),如 WebSocket
	if res.StatusCode == http.StatusSwitchingProtocols {
		if !p.modifyResponse(rw, res, outreq) {
			return
		}
		p.handleUpgradeResponse(rw, outreq, res)
		return
	}

	// 移除响应中的逐跳头部
	removeHopByHopHeaders(res.Header)

	// 调用 ModifyResponse 钩子(如果设置),允许修改响应
	if !p.modifyResponse(rw, res, outreq) {
		return
	}

	// 将后端响应的头部复制到客户端响应中
	copyHeader(rw.Header(), res.Header)

	// 处理 trailer(尾部)头部:Transport 不会自动包含 Trailer 头部,需要从 res.Trailer 构建
	announcedTrailers := len(res.Trailer)
	if announcedTrailers > 0 {
		trailerKeys := make([]string, 0, len(res.Trailer))
		for k := range res.Trailer {
			trailerKeys = append(trailerKeys, k)
		}
		rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
	}

	// 写入响应状态码
	rw.WriteHeader(res.StatusCode)

	// 将响应体复制到客户端,并根据 flushInterval 刷新
	err = p.copyResponse(rw, res.Body, p.flushInterval(res))
	if err != nil {
		defer res.Body.Close()
		// 复制过程中出错,根据是否应该 panic 决定处理方式
		if !shouldPanicOnCopyError(req) {
			p.logf("测试中抑制 copyResponse 错误导致的 panic;copy 错误:%v", err)
			return
		}
		// 触发中止处理器的 panic,中断请求
		panic(http.ErrAbortHandler)
	}
	res.Body.Close() // 现在关闭 Body 以填充 res.Trailer

	// 如果响应中带有 trailer,强制分块传输(避免添加 Content-Length)
	if len(res.Trailer) > 0 {
		http.NewResponseController(rw).Flush()
	}

	// 写入 trailer 头部
	if len(res.Trailer) == announcedTrailers {
		// 简单情况:直接复制 trailer
		copyHeader(rw.Header(), res.Trailer)
		return
	}

	// 复杂情况:某些 trailer 可能未被预先声明,使用 TrailerPrefix 前缀写入
	for k, vv := range res.Trailer {
		k = http.TrailerPrefix + k
		for _, v := range vv {
			rw.Header().Add(k, v)
		}
	}
}

发表评论