package engine import ( "net" "testing" "git.difuse.io/Difuse/Mellaris/analyzer" "git.difuse.io/Difuse/Mellaris/ruleset" "github.com/bwmarrin/snowflake" "github.com/google/gopacket" "github.com/google/gopacket/layers" ) type legacyUDPStreamValue struct { IPFlow gopacket.Flow UDPFlow gopacket.Flow } type emptyRuleset struct{} func (emptyRuleset) Analyzers(ruleset.StreamInfo) []analyzer.Analyzer { return nil } func (emptyRuleset) Match(ruleset.StreamInfo) ruleset.MatchResult { return ruleset.MatchResult{Action: ruleset.ActionMaybe} } func benchmarkUDPManager(b *testing.B, churn bool) { node, err := snowflake.NewNode(0) if err != nil { b.Fatalf("create node: %v", err) } factory := &udpStreamFactory{WorkerID: 0, Logger: noopTestLogger{}, Node: node, Ruleset: emptyRuleset{}} mgr, err := newUDPStreamManager(factory, 200000, &statsCounters{}) if err != nil { b.Fatalf("new manager: %v", err) } const flowCount = 20000 flows := make([]gopacket.Flow, flowCount) udps := make([]*layers.UDP, flowCount) for i := 0; i < flowCount; i++ { a := byte(i >> 8) c := byte(i) flows[i] = gopacket.NewFlow(layers.EndpointIPv4, net.IPv4(10, a, 0, c).To4(), net.IPv4(172, 16, a, c).To4()) udps[i] = &layers.UDP{ SrcPort: layers.UDPPort(1024 + i%20000), DstPort: layers.UDPPort(20000 + (i*7)%20000), BaseLayer: layers.BaseLayer{Payload: []byte{0x01, 0x00, 0x00, 0x00}}, } } ctx := &udpContext{Verdict: udpVerdictAccept} b.ResetTimer() for i := 0; i < b.N; i++ { idx := i % flowCount streamID := uint32(idx + 1) if churn { streamID = uint32((i % flowCount) + 1 + ((i / flowCount) * flowCount)) } ctx.Verdict = udpVerdictAccept ctx.Packet = nil mgr.MatchWithContext(streamID, flows[idx], udps[idx], ctx) } } func BenchmarkUDPManagerMatchStableStreamID(b *testing.B) { benchmarkUDPManager(b, false) } func BenchmarkUDPManagerMatchStreamIDChurn(b *testing.B) { benchmarkUDPManager(b, true) } func BenchmarkLegacyUDPFallbackScanChurn(b *testing.B) { const flowCount = 5000 flows := make([]gopacket.Flow, flowCount) udps := make([]*layers.UDP, flowCount) for i := 0; i < flowCount; i++ { a := byte(i >> 8) c := byte(i) flows[i] = gopacket.NewFlow(layers.EndpointIPv4, net.IPv4(10, a, 0, c).To4(), net.IPv4(172, 16, a, c).To4()) udps[i] = &layers.UDP{ SrcPort: layers.UDPPort(1024 + i%20000), DstPort: layers.UDPPort(20000 + (i*7)%20000), BaseLayer: layers.BaseLayer{Payload: []byte{0x01, 0x00, 0x00, 0x00}}, } } streams := make(map[uint32]*legacyUDPStreamValue, flowCount) keys := make([]uint32, 0, flowCount) for i := 0; i < flowCount; i++ { streamID := uint32(i + 1) streams[streamID] = &legacyUDPStreamValue{ IPFlow: flows[i], UDPFlow: udps[i].TransportFlow(), } keys = append(keys, streamID) } b.ResetTimer() for i := 0; i < b.N; i++ { idx := i % flowCount streamID := uint32((i % flowCount) + 1 + ((i / flowCount) * flowCount)) if _, ok := streams[streamID]; ok { continue } ipFlow := flows[idx] udpFlow := udps[idx].TransportFlow() for _, k := range keys { v, ok := streams[k] if !ok || v == nil { continue } if (v.IPFlow == ipFlow && v.UDPFlow == udpFlow) || (v.IPFlow == ipFlow.Reverse() && v.UDPFlow == udpFlow.Reverse()) { delete(streams, k) streams[streamID] = v break } } } }