bfe业余分析(二)

这篇主要分析一下bfe主服务流程。bfe的整合http服务都是自己实现的,这样相比标准库会有有更强的控制力。

整体结构

先看下源码目录结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
$ tree -L 1
.
├── ADOPTERS.md
├── bfe_balance
├── bfe_basic
├── bfe_bufio
├── bfe_config
├── bfe_debug
├── bfe.go
├── bfe_http
├── bfe_http2
├── bfe_module
├── bfe_modules
├── bfe_net
├── bfe_proxy
├── bfe_route
├── bfe_server
├── bfe_spdy
├── bfe_stream
├── bfe_tls
├── bfe_util
├── bfe_websocket
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── conf
├── CONTRIBUTING.md
├── CONTRIBUTORS.md
├── Dockerfile
├── docs
├── go.mod
├── go.sum
├── LICENSE
├── log
├── Makefile
├── NOTICE
├── output
├── README.md
├── SECURITY.md
├── snap
└── VERSION

23 directories, 15 files

bfe.go是主函数,主流程主要放在了bfe_server中。

主流程

bfe.go中主要执行配置加载,日志初始化与服务启动:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
    上面的都省略了
*/

    log.Logger.Info("bfe[version:%s] start", version)

    // load server config
    confPath := path.Join(*confRoot, "bfe.conf")
    config, err = bfe_conf.BfeConfigLoad(confPath, *confRoot)
    if err != nil {
        log.Logger.Error("main(): in BfeConfigLoad():%s", err.Error())
        bfe_util.AbnormalExit()
    }

    // set maximum number of cpus
    runtime.GOMAXPROCS(config.Server.MaxCpus)

    // set log level
    bfe_debug.SetDebugFlag(config.Server)

/*
    bfe_server.StartUp进入服务启动流程
*/
    // start and serve
    if err = bfe_server.StartUp(config, version, *confRoot); err != nil {
        log.Logger.Error("main(): bfe_server.StartUp(): %s", err.Error())
    }

    // waiting for logger finish jobs
    time.Sleep(1 * time.Second)
    log.Logger.Close()

接下来看看bfe_server.StartUp()函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/*
    bfe_server/bfe_server_init.go
*/

func StartUp(cfg bfe_conf.BfeConfig, version string, confRoot string) error {
    // create listeners
    lnMap, err := createListeners(cfg)
    if err != nil {
        log.Logger.Error("StartUp(): createListeners():%s", err.Error())
        return err
    }

    // set all available modules
    bfe_modules.SetModules()

    // create bfe server
    bfeServer := NewBfeServer(cfg, lnMap, version)

    // initial http
    err = bfeServer.InitHttp()
    if err != nil {
        log.Logger.Error("StartUp(): InitHttp():%s", err.Error())
        return err
    }

    // initial https
    err = bfeServer.InitHttps()
    if err != nil {
        log.Logger.Error("StartUp(): InitHttps():%s", err.Error())
        return err
    }

    // load data
    err = bfeServer.InitDataLoad()
    if err != nil {
        log.Logger.Error("StartUp(): bfeServer.InitDataLoad():%s",
            err.Error())
        return err
    }
    log.Logger.Info("StartUp(): bfeServer.InitDataLoad() OK")

    // setup signal table
    bfeServer.InitSignalTable()
    log.Logger.Info("StartUp():bfeServer.InitSignalTable() OK")

    // init web monitor
    monitorPort := cfg.Server.MonitorPort
    err = bfeServer.InitWebMonitor(monitorPort)
    if err != nil {
        log.Logger.Error("StartUp(): InitWebMonitor():%s", err.Error())
        return err
    }

    // register modules
    err = bfeServer.RegisterModules(cfg.Server.Modules)
    if err != nil {
        log.Logger.Error("StartUp(): RegisterModules():%s", err.Error())
        return err
    }

    // initialize modules
    err = bfeServer.InitModules(confRoot)
    if err != nil {
        log.Logger.Error("StartUp(): bfeServer.InitModules():%s",
            err.Error())
        return err
    }
    log.Logger.Info("StartUp():bfeServer.InitModules() OK")

    // start embedded web server
    bfeServer.Monitor.Start()

    serveChan := make(chan error)

/*
    下面这块的多个goroutine跑ServeHttp逻辑我其实没太懂, 里面就是执行accept()
    或许跟reuseport有关?默认情况下AcceptNum是1
*/

    // start goroutine to accept http connections
    for i := 0; i < cfg.Server.AcceptNum; i++ {
        go func() {
            httpErr := bfeServer.ServeHttp(bfeServer.HttpListener)
            serveChan <- httpErr
        }()
    }

    // start goroutine to accept https connections
    for i := 0; i < cfg.Server.AcceptNum; i++ {
        go func() {
            httpsErr := bfeServer.ServeHttps(bfeServer.HttpsListener)
            serveChan <- httpsErr
        }()
    }

    err = <-serveChan
    return err
}

bfeServer.ServeHttpbfeServer.ServeHttps底层其实都是调用Serve()函数, 建立连接后创建一个go routine去执行请求:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*
    bfe_server/http_server.go
*/

// Serve accepts incoming connections on the Listener l, creating a
// new service goroutine for each.  The service goroutines read requests and
// then call srv.Handler to reply to them.
//
// Params
//     - l  : net listener
//     - raw: underlying tcp listener (different from `l` in HTTPS)
//
// Return
//     - err: error
func (srv *BfeServer) Serve(l net.Listener, raw net.Listener, proto string) error {
    var tempDelay time.Duration // how long to sleep on accept failure
    proxyState := srv.serverStatus.ProxyState

    for {
        // accept new connection
        rw, e := l.Accept()
        if e != nil {
            if isTimeout(e) {
                proxyState.ErrClientTimeout.Inc(1)
                continue
            }
            proxyState.ErrClientConnAccept.Inc(1)

            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                tempDelay = delayCalc(tempDelay)

                log.Logger.Error("http: Accept error: %v; retrying in %v", e, tempDelay)
                time.Sleep(tempDelay)
                continue
            }

            // if in GraceShutdown state, exit accept loop after timeout
            if srv.CheckGracefulShutdown() {
                shutdownTimeout := srv.Config.Server.GracefulShutdownTimeout
                time.Sleep(time.Duration(shutdownTimeout) * time.Second)
            }

            return e
        }

        // create data structure for new connection
        c, err := newConn(rw, srv)
        if err != nil {
            // current, here is unreachable
            continue
        }

        // start go-routine for new connection
        go c.serve()
    }
}

c.serve()中主要是去读取和解析http请求,然后调用c.serveRequest(w, request),然后这个函数会调用c.server.ReverseProxy.ServeHTTP(w, request),这个函数可以认为是上层业务的入口,里面的逻辑对上之前的概念。其中包含若干个阶段的hook,供外置模块执行:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// ServeHTTP processes http request and send http response.
//
// Params:
//    - rw : context for sending response
//    - request: context for request
//
// Return:
//    - action: action to do after ServeHTTP
func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic.Request) (action int) {
    var err error
    var res *bfe_http.Response
    var hl *bfe_module.HandlerList
    var retVal int
    var clusterName string
    var cluster *bfe_cluster.BfeCluster
    var outreq *bfe_http.Request
    var serverConf *bfe_route.ServerDataConf
    var writeTimer *time.Timer

    req := basicReq.HttpRequest
    isRedirect := false
    resFlushInterval := time.Duration(0)
    cancelOnClientClose := false

    // get instance of BfeServer
    srv := p.server

    // set clientip of original user for request
    setClientAddr(basicReq)

    // Callback for HANDLE_BEFORE_LOCATION
    hl = srv.CallBacks.GetHandlerList(bfe_module.HandleBeforeLocation)
    if hl != nil {
        retVal, res = hl.FilterRequest(basicReq)
        basicReq.HttpResponse = res
        switch retVal {
        case bfe_module.BfeHandlerClose:
            // close the connection directly (with no response)
            action = closeDirectly
            return
        case bfe_module.BfeHandlerFinish:
            // close the connection after response
            action = closeAfterReply
            basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
            return
        case bfe_module.BfeHandlerRedirect:
            // make redirect
            Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code)
            isRedirect = true
            basicReq.BfeStatusCode = basicReq.Redirect.Code
            goto send_response
        case bfe_module.BfeHandlerResponse:
            goto response_got
        }
    }

    // find product
    if err := srv.findProduct(basicReq); err != nil {
        basicReq.ErrCode = bfe_basic.ErrBkFindProduct
        basicReq.ErrMsg = err.Error()
        p.proxyState.ErrBkFindProduct.Inc(1)
        log.Logger.Info("FindProduct error[%s] host[%s] vip[%s] clientip[%s]", err.Error(),
            basicReq.HttpRequest.Host, basicReq.Session.Vip, basicReq.ClientAddr)

        // close connection
        res = bfe_basic.CreateInternalSrvErrResp(basicReq)
        action = closeAfterReply
        goto response_got
    }

    // Callback for HandleFoundProduct
    hl = srv.CallBacks.GetHandlerList(bfe_module.HandleFoundProduct)
    if hl != nil {
        retVal, res = hl.FilterRequest(basicReq)
        basicReq.HttpResponse = res
        switch retVal {
        case bfe_module.BfeHandlerClose:
            // close the connection directly (with no response)
            action = closeDirectly
            return
        case bfe_module.BfeHandlerFinish:
            // close the connection after response
            action = closeAfterReply
            basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
            return
        case bfe_module.BfeHandlerRedirect:
            // make redirect
            Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code)
            isRedirect = true
            basicReq.BfeStatusCode = basicReq.Redirect.Code
            goto send_response
        case bfe_module.BfeHandlerResponse:
            goto response_got
        }
    }

    // find cluster
    if err = srv.findCluster(basicReq); err != nil {
        basicReq.ErrCode = bfe_basic.ErrBkFindLocation
        basicReq.ErrMsg = err.Error()
        p.proxyState.ErrBkFindLocation.Inc(1)
        log.Logger.Info("FindLocation error[%s] host[%s]", err, basicReq.HttpRequest.Host)

        // close connection
        res = bfe_basic.CreateInternalSrvErrResp(basicReq)
        action = closeAfterReply
        goto response_got
    }
    clusterName = basicReq.Route.ClusterName

    // look up for cluster
    serverConf = basicReq.SvrDataConf.(*bfe_route.ServerDataConf)
    cluster, err = serverConf.ClusterTable.Lookup(clusterName)
    if err != nil {
        log.Logger.Warn("no cluster for %s", clusterName)
        basicReq.Stat.ResponseStart = time.Now()
        basicReq.ErrCode = bfe_basic.ErrBkNoCluster
        basicReq.ErrMsg = err.Error()
        p.proxyState.ErrBkNoCluster.Inc(1)

        res = bfe_basic.CreateInternalSrvErrResp(basicReq)
        action = closeAfterReply
        goto response_got
    }

    basicReq.Backend.ClusterName = clusterName

    // set deadline to finish read client request body
    p.setTimeout(bfe_basic.StageReadReqBody, basicReq.Connection, req, cluster.TimeoutReadClient())

    // Callback for HandleAfterLocation
    hl = srv.CallBacks.GetHandlerList(bfe_module.HandleAfterLocation)
    if hl != nil {
        retVal, res = hl.FilterRequest(basicReq)
        basicReq.HttpResponse = res
        switch retVal {
        case bfe_module.BfeHandlerClose:
            // close the connection directly (with no response)
            action = closeDirectly
            return
        case bfe_module.BfeHandlerFinish:
            // close the connection after response
            action = closeAfterReply
            basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
            return
        case bfe_module.BfeHandlerRedirect:
            // make redirect
            Redirect(rw, req, basicReq.Redirect.Url, basicReq.Redirect.Code)

            isRedirect = true

            basicReq.BfeStatusCode = basicReq.Redirect.Code
            goto send_response
        case bfe_module.BfeHandlerResponse:
            goto response_got
        }
    }

    if bfe_debug.DebugServHTTP {
        log.Logger.Debug("ReverseProxy.ServeHTTP(): cluster name = %s", clusterName)
    }

    // prepare out request to downstream RS backend
    outreq = new(bfe_http.Request)
    *outreq = *req // includes shallow copies of maps, but okay
    basicReq.OutRequest = outreq

    // set http proto for out request
    httpProtoSet(outreq)
    // remove hop-by-hop headers
    hopByHopHeaderRemove(outreq, req)

    // invoke cluster to get response
    res, action, err = p.clusterInvoke(srv, cluster, basicReq, rw)
    basicReq.HttpResponse = res
    if err != nil {
        basicReq.Stat.ResponseStart = time.Now()
        basicReq.BfeStatusCode = bfe_http.StatusInternalServerError
        res = bfe_basic.CreateInternalSrvErrResp(basicReq)
        goto response_got
    }
    resFlushInterval = cluster.ResFlushInterval()
    cancelOnClientClose = cluster.CancelOnClientClose()
    if resFlushInterval == 0 && basicReq.HttpRequest.Header.Get("Accept") == "text/event-stream" {
        resFlushInterval = cluster.DefaultSSEFlushInterval()
    }

    // timeout for write response to client
    // Note: we use io.Copy() to read from backend and write to client.
    // For avoid from blocking on client conn or backend conn forever,
    // we must timeout both conns after specified duration.
    p.setTimeout(bfe_basic.StageWriteClient, basicReq.Connection, req, cluster.TimeoutWriteClient())
    writeTimer = time.AfterFunc(cluster.TimeoutWriteClient(), func() {
        transport := basicReq.Trans.Transport.(*bfe_http.Transport)
        transport.CancelRequest(basicReq.OutRequest) // force close connection to backend
    })
    defer writeTimer.Stop()

    // for read next request
    defer p.setTimeout(bfe_basic.StageEndRequest, basicReq.Connection, req, cluster.TimeoutReadClientAgain())

/*
    后面先省略...
*/

这个函数执行完基本上就完成了一次请求的处理。