addon logic

addon-dailer
lqqyt2423 4 years ago
parent c41bf7c881
commit c07364b358

@ -12,6 +12,6 @@
- [x] 经内存转发 https 流量 - [x] 经内存转发 https 流量
- [x] 忽略某些错误例如broken pipe, reset by peer, timeout - [x] 忽略某些错误例如broken pipe, reset by peer, timeout
- [x] websocket - [x] websocket
- [x] 插件机制
- [ ] support get method with body - [ ] support get method with body
- [ ] http2 - [ ] http2
- [ ] 插件机制

@ -0,0 +1,91 @@
package flow
import (
"net/http"
"net/url"
"time"
_log "github.com/sirupsen/logrus"
)
var log = _log.WithField("at", "flow")
type Request struct {
Method string
URL *url.URL
Proto string
Header http.Header
Body []byte
}
type Response struct {
StatusCode int
Header http.Header
Body []byte
}
type Flow struct {
*Request
*Response
// https://docs.mitmproxy.org/stable/overview-features/#streaming
// 如果为 true则不缓冲 Request.Body 和 Response.Body且不进入之后的 Addon.Request 和 Addon.Response
Stream bool
done chan struct{}
}
func NewFlow() *Flow {
return &Flow{done: make(chan struct{})}
}
func (f *Flow) Done() <-chan struct{} {
return f.done
}
func (f *Flow) Finish() {
close(f.done)
}
type Addon interface {
// HTTP request headers were successfully read. At this point, the body is empty.
Requestheaders(*Flow)
// The full HTTP request has been read.
Request(*Flow)
// HTTP response headers were successfully read. At this point, the body is empty.
Responseheaders(*Flow)
// The full HTTP response has been read.
Response(*Flow)
}
// BaseAddon do nothing
type BaseAddon struct{}
func (addon *BaseAddon) Requestheaders(*Flow) {}
func (addon *BaseAddon) Request(*Flow) {}
func (addon *BaseAddon) Responseheaders(*Flow) {}
func (addon *BaseAddon) Response(*Flow) {}
// LogAddon log http record
type LogAddon struct {
BaseAddon
}
func (addon *LogAddon) Requestheaders(flo *Flow) {
log := log.WithField("in", "LogAddon")
start := time.Now()
go func() {
<-flo.Done()
var StatusCode int
if flo.Response != nil {
StatusCode = flo.Response.StatusCode
}
var contentLen int
if flo.Response != nil && flo.Response.Body != nil {
contentLen = len(flo.Response.Body)
}
log.Infof("%v %v %v %v - %v ms\n", flo.Request.Method, flo.Request.URL.String(), StatusCode, contentLen, time.Since(start).Milliseconds())
}()
}

@ -0,0 +1,26 @@
package proxy
import (
"bytes"
"io"
)
// 尝试将 Reader 读取至 buffer 中
func ReaderToBuffer(r io.Reader, limit int64) ([]byte, io.Reader, error) {
buf := bytes.NewBuffer(make([]byte, 0))
lr := io.LimitReader(r, limit)
_, err := io.Copy(buf, lr)
if err != nil {
return nil, nil, err
}
// 达到上限
if int64(buf.Len()) == limit {
// 返回新的 Reader
return nil, io.MultiReader(bytes.NewBuffer(buf.Bytes()), r), nil
}
// 返回 buffer
return buf.Bytes(), nil, nil
}

@ -1,6 +1,7 @@
package proxy package proxy
import ( import (
"bytes"
"crypto/tls" "crypto/tls"
"io" "io"
"net" "net"
@ -10,6 +11,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/lqqyt2423/go-mitmproxy/flow"
_log "github.com/sirupsen/logrus" _log "github.com/sirupsen/logrus"
) )
@ -75,9 +77,15 @@ type Options struct {
} }
type Proxy struct { type Proxy struct {
Server *http.Server Server *http.Server
Client *http.Client Client *http.Client
Mitm Mitm Mitm Mitm
StreamLargeBodies int64
Addons []flow.Addon
}
func (proxy *Proxy) AddAddon(addon flow.Addon) {
proxy.Addons = append(proxy.Addons, addon)
} }
func (proxy *Proxy) Start() error { func (proxy *Proxy) Start() error {
@ -121,16 +129,92 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) {
return return
} }
start := time.Now() endRes := func(response *flow.Response, body io.Reader) {
if response.Header != nil {
for key, value := range response.Header {
for _, v := range value {
res.Header().Add(key, v)
}
}
}
res.WriteHeader(response.StatusCode)
if body != nil {
_, err := io.Copy(res, body)
if err != nil && !ignoreErr(log, err) {
log.Error(err)
}
} else if response.Body != nil && len(response.Body) > 0 {
_, err := res.Write(response.Body)
if err != nil && !ignoreErr(log, err) {
log.Error(err)
}
}
}
// when addons panic
defer func() {
if err := recover(); err != nil {
log.Warnf("Recovered: %v\n", err)
}
}()
flo := flow.NewFlow()
flo.Request = &flow.Request{
Method: req.Method,
URL: req.URL,
Proto: req.Proto,
Header: req.Header,
}
defer flo.Finish()
// trigger addon event Requestheaders
for _, addon := range proxy.Addons {
addon.Requestheaders(flo)
if flo.Response != nil {
endRes(flo.Response, nil)
return
}
}
// 读 request body
var reqBody io.Reader = req.Body
if !flo.Stream {
reqBuf, r, err := ReaderToBuffer(req.Body, proxy.StreamLargeBodies)
reqBody = r
if err != nil {
log.Error(err)
res.WriteHeader(502)
return
}
if reqBuf == nil {
log.Warnf("request body size >= %v\n", proxy.StreamLargeBodies)
flo.Stream = true
} else {
flo.Request.Body = reqBuf
}
// trigger addon event Request
if !flo.Stream {
for _, addon := range proxy.Addons {
addon.Request(flo)
if flo.Response != nil {
endRes(flo.Response, nil)
return
}
}
reqBody = bytes.NewReader(flo.Request.Body)
}
}
proxyReq, err := http.NewRequest(req.Method, req.URL.String(), req.Body) proxyReq, err := http.NewRequest(flo.Request.Method, flo.Request.URL.String(), reqBody)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
res.WriteHeader(502) res.WriteHeader(502)
return return
} }
for key, value := range req.Header { for key, value := range flo.Request.Header {
for _, v := range value { for _, v := range value {
proxyReq.Header.Add(key, v) proxyReq.Header.Add(key, v)
} }
@ -145,19 +229,46 @@ func (proxy *Proxy) ServeHTTP(res http.ResponseWriter, req *http.Request) {
} }
defer proxyRes.Body.Close() defer proxyRes.Body.Close()
for key, value := range proxyRes.Header { flo.Response = &flow.Response{
for _, v := range value { StatusCode: proxyRes.StatusCode,
res.Header().Add(key, v) Header: proxyRes.Header,
}
// trigger addon event Responseheaders
for _, addon := range proxy.Addons {
addon.Responseheaders(flo)
if flo.Response.Body != nil {
endRes(flo.Response, nil)
return
} }
} }
res.WriteHeader(proxyRes.StatusCode)
_, err = io.Copy(res, proxyRes.Body) // 读 response body
if err != nil && !ignoreErr(log, err) { var resBody io.Reader = proxyRes.Body
log.Error(err) if !flo.Stream {
return resBuf, r, err := ReaderToBuffer(proxyRes.Body, proxy.StreamLargeBodies)
resBody = r
if err != nil {
log.Error(err)
res.WriteHeader(502)
return
}
if resBuf == nil {
log.Warnf("response body size >= %v\n", proxy.StreamLargeBodies)
flo.Stream = true
} else {
flo.Response.Body = resBuf
}
// trigger addon event Response
if !flo.Stream {
for _, addon := range proxy.Addons {
addon.Response(flo)
}
}
} }
log.Infof("status code: %v cost %v ms\n", proxyRes.StatusCode, time.Since(start).Milliseconds()) endRes(flo.Response, resBody)
} }
func (proxy *Proxy) handleConnect(res http.ResponseWriter, req *http.Request) { func (proxy *Proxy) handleConnect(res http.ResponseWriter, req *http.Request) {
@ -233,6 +344,10 @@ func NewProxy(opts *Options) (*Proxy, error) {
proxy.Mitm = mitm proxy.Mitm = mitm
proxy.StreamLargeBodies = 1024 * 1024 * 5 // 5mb
proxy.Addons = make([]flow.Addon, 0)
proxy.AddAddon(&flow.LogAddon{})
return proxy, nil return proxy, nil
} }

Loading…
Cancel
Save