feed: make non-blocking, engine: move around dispatch call for perf
This commit is contained in:
+2
-3
@@ -99,12 +99,11 @@ func (e *engine) dispatch(p io.Packet) bool {
|
||||
_ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil)
|
||||
return true
|
||||
}
|
||||
// Load balance by stream ID
|
||||
index := p.StreamID() % uint32(len(e.workers))
|
||||
packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
|
||||
e.workers[index].Feed(&workerPacket{
|
||||
StreamID: p.StreamID(),
|
||||
Packet: packet,
|
||||
Data: data,
|
||||
LayerType: layerType,
|
||||
SrcMAC: srcMAC,
|
||||
DstMAC: dstMAC,
|
||||
SetVerdict: func(v io.Verdict, b []byte) error {
|
||||
|
||||
+9
-4
@@ -22,7 +22,8 @@ const (
|
||||
|
||||
type workerPacket struct {
|
||||
StreamID uint32
|
||||
Packet gopacket.Packet
|
||||
Data []byte
|
||||
LayerType gopacket.LayerType
|
||||
SrcMAC net.HardwareAddr
|
||||
DstMAC net.HardwareAddr
|
||||
SetVerdict func(io.Verdict, []byte) error
|
||||
@@ -111,7 +112,11 @@ func newWorker(config workerConfig) (*worker, error) {
|
||||
}
|
||||
|
||||
func (w *worker) Feed(p *workerPacket) {
|
||||
w.packetChan <- p
|
||||
select {
|
||||
case w.packetChan <- p:
|
||||
default:
|
||||
_ = p.SetVerdict(io.VerdictAccept, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *worker) Run(ctx context.Context) {
|
||||
@@ -123,10 +128,10 @@ func (w *worker) Run(ctx context.Context) {
|
||||
return
|
||||
case wPkt := <-w.packetChan:
|
||||
if wPkt == nil {
|
||||
// Closed
|
||||
return
|
||||
}
|
||||
v, b := w.handle(wPkt.StreamID, wPkt.Packet, wPkt.SrcMAC, wPkt.DstMAC)
|
||||
pkt := gopacket.NewPacket(wPkt.Data, wPkt.LayerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true})
|
||||
v, b := w.handle(wPkt.StreamID, pkt, wPkt.SrcMAC, wPkt.DstMAC)
|
||||
_ = wPkt.SetVerdict(v, b)
|
||||
}
|
||||
}
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user