diff --git a/engine/engine.go b/engine/engine.go index 483476c..8bf7ff8 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -99,14 +99,13 @@ func (e *engine) dispatch(p io.Packet) bool { _ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil) return true } - // Load balance by stream ID index := p.StreamID() % uint32(len(e.workers)) - packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) e.workers[index].Feed(&workerPacket{ - StreamID: p.StreamID(), - Packet: packet, - SrcMAC: srcMAC, - DstMAC: dstMAC, + StreamID: p.StreamID(), + Data: data, + LayerType: layerType, + SrcMAC: srcMAC, + DstMAC: dstMAC, SetVerdict: func(v io.Verdict, b []byte) error { return e.io.SetVerdict(p, v, b) }, diff --git a/engine/worker.go b/engine/worker.go index e146637..558ba72 100644 --- a/engine/worker.go +++ b/engine/worker.go @@ -22,7 +22,8 @@ const ( type workerPacket struct { StreamID uint32 - Packet gopacket.Packet + Data []byte + LayerType gopacket.LayerType SrcMAC net.HardwareAddr DstMAC net.HardwareAddr SetVerdict func(io.Verdict, []byte) error @@ -111,7 +112,11 @@ func newWorker(config workerConfig) (*worker, error) { } func (w *worker) Feed(p *workerPacket) { - w.packetChan <- p + select { + case w.packetChan <- p: + default: + _ = p.SetVerdict(io.VerdictAccept, nil) + } } func (w *worker) Run(ctx context.Context) { @@ -123,10 +128,10 @@ func (w *worker) Run(ctx context.Context) { return case wPkt := <-w.packetChan: if wPkt == nil { - // Closed return } - v, b := w.handle(wPkt.StreamID, wPkt.Packet, wPkt.SrcMAC, wPkt.DstMAC) + pkt := gopacket.NewPacket(wPkt.Data, wPkt.LayerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) + v, b := w.handle(wPkt.StreamID, pkt, wPkt.SrcMAC, wPkt.DstMAC) _ = wPkt.SetVerdict(v, b) } } diff --git a/ruleset/builtins/geo/geoip.dat b/ruleset/builtins/geo/geoip.dat index a1c108d..9ddb321 100644 Binary files a/ruleset/builtins/geo/geoip.dat and b/ruleset/builtins/geo/geoip.dat differ