之前的简易版 HTTP 代理不具备下面这些功能:
- 错误回调及错误日志处理
- 更改代理返回内容
- 负载均衡
- url 重写
- 限流、熔断、降级
- 数据统计
- 权限验证
大纲:用 ReverseProxy 实现一个 HTTP 代理
- ReverseProxy 功能点
- ReverseProxy 示例
- ReverseProxy 源码分析
拓展 ReverseProxy 功能:
- 4种负载轮询类型实现以及接口封装
- 拓展中间件支持:限流、熔断实现、权限、数据统计
ReverseProxy 功能点
- 更改内容支持
- 错误信息回调
- 支持自定义负载均衡
- url 重写功能
- 连接池功能
- 支持 websocket 服务
- 支持 https 代理
用 ReverseProxy 实现一个 HTTP 代理
代码如下:
package main
import (
"log"
"net/http"
"net/http/httputil"
"net/url"
)
var addr = "127.0.0.1:2002"
func main() {
rs1 := "http://127.0.0.1:2003/base"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(err1)
}
proxy := httputil.NewSingleHostReverseProxy(url1)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
ReverseProxy 更改内容支持
// ModifyResponse is an optional function that modifies the
// Response from the backend. It is called if the backend
// returns a response at all, with any HTTP status code.
// If the backend is unreachable, the optional ErrorHandler is
// called without any call to ModifyResponse.
//
// Hop-by-hop headers are removed from the response before
// calling ModifyResponse. ModifyResponse may need to remove
// additional headers to fit its deployment model, such as Alt-Svc.
//
// If ModifyResponse returns an error, ErrorHandler is called
// with its error value. If ErrorHandler is nil, its default
// implementation is used.
ModifyResponse func(*http.Response) error
NewSingleHostReverseProxy 部分实现的功能如下:
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
director := func(req *http.Request) {
rewriteRequestURL(req, target)
}
return &ReverseProxy{Director: director}
}
func rewriteRequestURL(req *http.Request, target *url.URL) {、
// http:127.0.0.1:2002/dir?name=123
// RawQuery: name=123
// Scheme: http
// Host: 127.0.0.1:2002
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path, req.URL.RawPath = joinURLPath(target, req.URL)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
当我们希望对后端返回的响应内容进行修改时,可以实现自定义的 modifyResponse ,随后通过创建自定义的 ReverseProxy 而不是使用 NewSingleHostReverseProxy 来实现自定义函数的挂载,实现代码如下:
package main
import (
"bytes"
"io"
"log"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
)
var addr = "127.0.0.1:2002"
func main() {
rs1 := "http://127.0.0.1:2003/base"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(err1)
}
// proxy := httputil.NewSingleHostReverseProxy(url1)
proxy := &httputil.ReverseProxy{
Rewrite: func(r *httputil.ProxyRequest) {
r.SetURL(url1)
r.Out.Host = r.In.Host
},
ModifyResponse: modifyResponse,
}
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
func modifyResponse(resp *http.Response) error {
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
resp.Body.Close()
modifiedBody := append([]byte("Modified: "), bodyBytes...)
resp.Body = io.NopCloser(bytes.NewReader(modifiedBody))
resp.ContentLength = int64(len(modifiedBody))
resp.Header.Set("Content-Length", strconv.Itoa(len(modifiedBody)))
return nil
}
ReverseProxy 特殊 Header
Connection
标记请求发起方与第一代理的状态
决定当前事务完成后,是否会关闭网络
- Connection: keep-alive 不关闭网络
- Connection: close 关闭网络
- Connection: upgrade 协议升级
TE 、Trailer
- TE 是 request_header,表示希望使用的传输编码类型
如:TE: trailers, deflate; q=0.5 表示,期望在采用分块传输编码响应中接收挂载字段, zlib 编码,0.5优先级排序
- Trailer 是 response_header,允许发送方在消息后面添加额外的元信息
如:Trailer: Expires 表示,Expires 将出现在分块信息的结尾
第一代理除去标准的逐段传输头(hop-by-hop)
- 逐段传输头都需要在 Connection 头中列出
- 第一个代理知道必须处理它们且不转发它们
- 逐段传输头:Keep-Alive, Transfer-Encoding, TE, Connection, Trailer, Upgrade, Proxy-Authorization, Proxy-Authenticate
ReverseProxy 补充知识——特殊 StatusCode
- 100 表示目前一切正常,客户端可以继续请求
- 101 表示服务端发送给客户端升级协议的请求
100-continue
客户端要 POST 的数据大于 1024 字节的时候,客户端不会直接发起 POST 请求,而是会分两步:
- 发送一个请求,包含一个 Expect:100-continue,询问 Server 是否愿意接收数据
- 接收到 Server 返回的 100-continue 应答以后,返回 100 状态,才把数据 POST 给 Server
101-Switching Protocols

ReverseProxy 源码分析
核心结构体代码如下:
// ReverseProxy 是一个反向代理处理器,将请求转发给后端服务器,并将响应返回给客户端。
type ReverseProxy struct {
// Rewrite 修改请求,生成发往后端的新请求。不能与 Director 同时设置。
// 推荐使用 Rewrite,它更可控。
Rewrite func(*ProxyRequest)
// Director 修改请求,功能同 Rewrite,是旧版本的方式。不能与 Rewrite 同时设置。
Director func(*http.Request)
// Transport 执行代理请求,默认使用 http.DefaultTransport。
Transport http.RoundTripper
// FlushInterval 控制响应刷新的间隔。0 表示不主动刷新,负数表示每次写入后立即刷新。
FlushInterval time.Duration
// ErrorLog 记录代理过程中的错误,默认使用标准日志包。
ErrorLog *log.Logger
// BufferPool 用于复用缓冲区,提升性能。
BufferPool BufferPool
// ModifyResponse 可修改后端返回的响应(任意状态码)。若返回错误,则调用 ErrorHandler。
ModifyResponse func(*http.Response) error
// ErrorHandler 处理后端不可达或 ModifyResponse 返回的错误。默认返回 502 错误。
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
ServeHTTP 核心逻辑:
// ServeHTTP 实现了 http.Handler 接口,处理反向代理的核心逻辑。
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// 确定使用的传输层(Transport)
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}
ctx := req.Context()
// 处理客户端连接关闭的通知(仅当请求上下文没有取消信号时)
if ctx.Done() != nil {
// 如果请求上下文已包含取消信号(如超时、主动取消),
// 则不需要再额外监听 CloseNotify(它已被上下文取代)。
// 这里不做任何操作。
} else if cn, ok := rw.(http.CloseNotifier); ok {
// 如果响应器支持 CloseNotifier(即可以感知客户端连接关闭),
// 则创建一个可取消的上下文,并在单独的 goroutine 中监听关闭通知。
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel() // 客户端连接关闭,取消上下文
case <-ctx.Done():
// 上下文已被取消(例如请求处理完成),退出 goroutine
}
}()
}
// 克隆原始请求,以便后续修改(如修改 URL、Header 等),同时绑定新的上下文 ctx。
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
// 如果原始请求内容长度为 0,将出站请求的 Body 设为 nil,
// 避免 http.Transport 重试时出现问题(参见 Issue 16036)。
outreq.Body = nil
}
if outreq.Body != nil {
// 确保出站请求的 Body 最终被关闭,防止 goroutine 泄露。
// 注意:RoundTrip 可能在处理程序返回后仍在读取 Body,
// 关闭 Body 可以避免崩溃(参见 Issue 46866)。
defer outreq.Body.Close()
}
if outreq.Header == nil {
// 保证 Header 不为 nil,保持历史行为(参见 Issue 33142)
outreq.Header = make(http.Header)
}
// 检查是否同时设置了 Director 和 Rewrite(两者互斥)
if (p.Director != nil) == (p.Rewrite != nil) {
p.getErrorHandler()(rw, req, errors.New("ReverseProxy 必须且只能设置 Director 或 Rewrite 中的一个"))
return
}
// 如果使用 Director 方式,调用它修改请求
if p.Director != nil {
p.Director(outreq)
if outreq.Form != nil {
// 清理无法解析的查询参数
outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
}
}
outreq.Close = false // 禁用出站请求的 Close 标志(让 Transport 自行决定是否重用连接)
// 检查是否为协议升级请求(如 WebSocket)
reqUpType := upgradeType(outreq.Header)
if !ascii.IsPrint(reqUpType) {
// 协议名称必须是可打印字符,否则拒绝
p.getErrorHandler()(rw, req, fmt.Errorf("客户端尝试切换到无效协议 %q", reqUpType))
return
}
// 移除逐跳头部(hop-by-hop headers),如 Connection、Keep-Alive 等
removeHopByHopHeaders(outreq.Header)
// 如果原始请求中声明支持 trailer,则在出站请求中也声明支持(参见 Issue 21096)
if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
outreq.Header.Set("Te", "trailers")
}
// 如果是协议升级请求,重新添加必要的 Connection 和 Upgrade 头部
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
// 如果使用 Rewrite 方式
if p.Rewrite != nil {
// 移除客户端可能提供的转发头部,让 Rewrite 函数重新设置(如果需要)
outreq.Header.Del("Forwarded")
outreq.Header.Del("X-Forwarded-For")
outreq.Header.Del("X-Forwarded-Host")
outreq.Header.Del("X-Forwarded-Proto")
// 清理无法解析的查询参数
outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
// 构造 ProxyRequest 并调用 Rewrite 函数
pr := &ProxyRequest{
In: req,
Out: outreq,
}
p.Rewrite(pr)
outreq = pr.Out // 使用 Rewrite 可能返回修改后的新请求
} else {
// 使用 Director 方式时的默认行为:设置 X-Forwarded-For 头部
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// 如果出站请求中已存在 X-Forwarded-For 头部,则将当前客户端 IP 追加到后面
prior, ok := outreq.Header["X-Forwarded-For"]
omit := ok && prior == nil // Issue 38079:nil 值表示不要设置该头部
if len(prior) > 0 {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
if !omit {
outreq.Header.Set("X-Forwarded-For", clientIP)
}
}
}
// 如果出站请求没有设置 User-Agent,则设为空字符串(避免使用默认的 Go User-Agent)
if _, ok := outreq.Header["User-Agent"]; !ok {
outreq.Header.Set("User-Agent", "")
}
// 用于跟踪 1xx 响应(如 100 Continue)的同步机制
var (
roundTripMutex sync.Mutex
roundTripDone bool
)
trace := &httptrace.ClientTrace{
Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
roundTripMutex.Lock()
defer roundTripMutex.Unlock()
if roundTripDone {
// 如果 RoundTrip 已经完成,不再修改响应头
return nil
}
h := rw.Header()
copyHeader(h, http.Header(header))
rw.WriteHeader(code)
// 对于 1xx 响应,WriteHeader 不会自动清除之前的头部,需要手动清除
clear(h)
return nil
},
}
// 将追踪对象绑定到出站请求的上下文中
outreq = outreq.WithContext(httptrace.WithClientTrace(outreq.Context(), trace))
// 使用 Transport 发送请求,获得后端响应
res, err := transport.RoundTrip(outreq)
roundTripMutex.Lock()
roundTripDone = true
roundTripMutex.Unlock()
if err != nil {
// 发生错误(如后端不可达),调用错误处理器
p.getErrorHandler()(rw, outreq, err)
return
}
// 处理协议升级响应(101 Switching Protocols),如 WebSocket
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
// 移除响应中的逐跳头部
removeHopByHopHeaders(res.Header)
// 调用 ModifyResponse 钩子(如果设置),允许修改响应
if !p.modifyResponse(rw, res, outreq) {
return
}
// 将后端响应的头部复制到客户端响应中
copyHeader(rw.Header(), res.Header)
// 处理 trailer(尾部)头部:Transport 不会自动包含 Trailer 头部,需要从 res.Trailer 构建
announcedTrailers := len(res.Trailer)
if announcedTrailers > 0 {
trailerKeys := make([]string, 0, len(res.Trailer))
for k := range res.Trailer {
trailerKeys = append(trailerKeys, k)
}
rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}
// 写入响应状态码
rw.WriteHeader(res.StatusCode)
// 将响应体复制到客户端,并根据 flushInterval 刷新
err = p.copyResponse(rw, res.Body, p.flushInterval(res))
if err != nil {
defer res.Body.Close()
// 复制过程中出错,根据是否应该 panic 决定处理方式
if !shouldPanicOnCopyError(req) {
p.logf("测试中抑制 copyResponse 错误导致的 panic;copy 错误:%v", err)
return
}
// 触发中止处理器的 panic,中断请求
panic(http.ErrAbortHandler)
}
res.Body.Close() // 现在关闭 Body 以填充 res.Trailer
// 如果响应中带有 trailer,强制分块传输(避免添加 Content-Length)
if len(res.Trailer) > 0 {
http.NewResponseController(rw).Flush()
}
// 写入 trailer 头部
if len(res.Trailer) == announcedTrailers {
// 简单情况:直接复制 trailer
copyHeader(rw.Header(), res.Trailer)
return
}
// 复杂情况:某些 trailer 可能未被预先声明,使用 TrailerPrefix 前缀写入
for k, vv := range res.Trailer {
k = http.TrailerPrefix + k
for _, v := range vv {
rw.Header().Add(k, v)
}
}
}