addon websocket interpect backend

addon-dailer
liqiang 4 years ago
parent f9d885296e
commit 6fc1e38f90

@ -2,6 +2,7 @@ package web
import ( import (
"encoding/json" "encoding/json"
"strings"
"sync" "sync"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -14,6 +15,8 @@ type concurrentConn struct {
waitChans map[string]chan interface{} waitChans map[string]chan interface{}
waitChansMu sync.Mutex waitChansMu sync.Mutex
interceptUri string
} }
func newConn(c *websocket.Conn) *concurrentConn { func newConn(c *websocket.Conn) *concurrentConn {
@ -24,6 +27,10 @@ func newConn(c *websocket.Conn) *concurrentConn {
} }
func (c *concurrentConn) writeMessage(msg *message, f *flow.Flow) { func (c *concurrentConn) writeMessage(msg *message, f *flow.Flow) {
if c.isIntercpt(f, msg) {
msg.waitIntercept = 1
}
c.mu.Lock() c.mu.Lock()
err := c.conn.WriteMessage(websocket.BinaryMessage, msg.bytes()) err := c.conn.WriteMessage(websocket.BinaryMessage, msg.bytes())
c.mu.Unlock() c.mu.Unlock()
@ -32,8 +39,10 @@ func (c *concurrentConn) writeMessage(msg *message, f *flow.Flow) {
return return
} }
if msg.waitIntercept == 1 {
c.waitIntercept(f, msg) c.waitIntercept(f, msg)
} }
}
func (c *concurrentConn) readloop() { func (c *concurrentConn) readloop() {
for { for {
@ -54,6 +63,15 @@ func (c *concurrentConn) readloop() {
continue continue
} }
if msg.mType == messageTypeChangeInterceptUri {
interceptUri := ""
if len(msg.content) > 0 {
interceptUri = string(msg.content)
}
c.interceptUri = interceptUri
continue
}
if msg.mType == messageTypeChangeRequest { if msg.mType == messageTypeChangeRequest {
req := new(flow.Request) req := new(flow.Request)
err := json.Unmarshal(msg.content, req) err := json.Unmarshal(msg.content, req)
@ -84,15 +102,21 @@ func (c *concurrentConn) initWaitChan(key string) chan interface{} {
// 是否拦截 // 是否拦截
func (c *concurrentConn) isIntercpt(f *flow.Flow, after *message) bool { func (c *concurrentConn) isIntercpt(f *flow.Flow, after *message) bool {
if after.mType != messageTypeRequest {
return false return false
} }
// 拦截 if c.interceptUri == "" {
func (c *concurrentConn) waitIntercept(f *flow.Flow, after *message) { return false
if !c.isIntercpt(f, after) { }
return if strings.Contains(f.Request.URL.String(), c.interceptUri) {
return true
}
return false
} }
// 拦截
func (c *concurrentConn) waitIntercept(f *flow.Flow, after *message) {
log.Infof("waiting Intercept: %s\n", f.Request.URL) log.Infof("waiting Intercept: %s\n", f.Request.URL)
ch := c.initWaitChan(f.Id.String()) ch := c.initWaitChan(f.Id.String())
req := (<-ch).(*flow.Request) req := (<-ch).(*flow.Request)

@ -18,10 +18,11 @@ const (
messageTypeResponseBody messageType = 3 messageTypeResponseBody messageType = 3
messageTypeChangeRequest messageType = 11 messageTypeChangeRequest messageType = 11
messageTypeChangeInterceptUri messageType = 21
) )
func validMessageType(t byte) bool { func validMessageType(t byte) bool {
if t == byte(messageTypeRequest) || t == byte(messageTypeResponse) || t == byte(messageTypeResponseBody) || t == byte(messageTypeChangeRequest) { if t == byte(messageTypeRequest) || t == byte(messageTypeResponse) || t == byte(messageTypeResponseBody) || t == byte(messageTypeChangeRequest) || t == byte(messageTypeChangeInterceptUri) {
return true return true
} }
return false return false
@ -30,6 +31,7 @@ func validMessageType(t byte) bool {
type message struct { type message struct {
mType messageType mType messageType
id uuid.UUID id uuid.UUID
waitIntercept byte
content []byte content []byte
} }
@ -42,7 +44,7 @@ func newMessage(mType messageType, id uuid.UUID, content []byte) *message {
} }
func parseMessage(data []byte) *message { func parseMessage(data []byte) *message {
if len(data) < 38 { if len(data) < 39 {
return nil return nil
} }
if data[0] != messageVersion { if data[0] != messageVersion {
@ -52,12 +54,14 @@ func parseMessage(data []byte) *message {
return nil return nil
} }
id, err := uuid.FromString(string(data[2:38])) id, err := uuid.FromString(string(data[3:39]))
if err != nil { if err != nil {
return nil return nil
} }
return newMessage(messageType(data[1]), id, data[38:]) msg := newMessage(messageType(data[1]), id, data[39:])
msg.waitIntercept = data[2]
return msg
} }
func newMessageRequest(f *flow.Flow) *message { func newMessageRequest(f *flow.Flow) *message {
@ -84,6 +88,7 @@ func (m *message) bytes() []byte {
buf := bytes.NewBuffer(make([]byte, 0)) buf := bytes.NewBuffer(make([]byte, 0))
buf.WriteByte(byte(messageVersion)) buf.WriteByte(byte(messageVersion))
buf.WriteByte(byte(m.mType)) buf.WriteByte(byte(m.mType))
buf.WriteByte(m.waitIntercept)
buf.WriteString(m.id.String()) // len: 36 buf.WriteString(m.id.String()) // len: 36
buf.Write(m.content) buf.Write(m.content)
return buf.Bytes() return buf.Bytes()

Loading…
Cancel
Save