package engine import ( "net" "sync/atomic" "testing" "git.difuse.io/Difuse/Mellaris/analyzer" "git.difuse.io/Difuse/Mellaris/ruleset" "github.com/bwmarrin/snowflake" "github.com/google/gopacket" "github.com/google/gopacket/layers" ) type countingRuleset struct { ans []analyzer.Analyzer } func (r countingRuleset) Analyzers(ruleset.StreamInfo) []analyzer.Analyzer { return r.ans } func (r countingRuleset) Match(ruleset.StreamInfo) ruleset.MatchResult { return ruleset.MatchResult{Action: ruleset.ActionMaybe} } type countingUDPAnalyzer struct{ newCalls *atomic.Uint64 } func (a countingUDPAnalyzer) Name() string { return "countudp" } func (a countingUDPAnalyzer) Limit() int { return 0 } func (a countingUDPAnalyzer) NewUDP(analyzer.UDPInfo, analyzer.Logger) analyzer.UDPStream { a.newCalls.Add(1) return countingUDPStream{} } type countingUDPStream struct{} func (countingUDPStream) Feed(bool, []byte) (*analyzer.PropUpdate, bool) { return nil, false } func (countingUDPStream) Close(bool) *analyzer.PropUpdate { return nil } func TestUDPStreamManagerRebindsByTupleInO1Path(t *testing.T) { node, err := snowflake.NewNode(0) if err != nil { t.Fatalf("create node: %v", err) } var newCalls atomic.Uint64 rs := countingRuleset{ans: []analyzer.Analyzer{countingUDPAnalyzer{newCalls: &newCalls}}} factory := &udpStreamFactory{ WorkerID: 0, Logger: noopTestLogger{}, Node: node, Ruleset: rs, } mgr, err := newUDPStreamManager(factory, 64, &statsCounters{}) if err != nil { t.Fatalf("new manager: %v", err) } ipFlow := gopacket.NewFlow(layers.EndpointIPv4, net.IPv4(10, 0, 0, 1).To4(), net.IPv4(10, 0, 0, 2).To4()) udp := &layers.UDP{SrcPort: 50000, DstPort: 443, BaseLayer: layers.BaseLayer{Payload: []byte{0x01, 0x00, 0x00, 0x00}}} ctx1 := &udpContext{Verdict: udpVerdictAccept} mgr.MatchWithContext(100, ipFlow, udp, ctx1) if got := newCalls.Load(); got != 1 { t.Fatalf("new stream calls=%d want=1", got) } ctx2 := &udpContext{Verdict: udpVerdictAccept} mgr.MatchWithContext(200, ipFlow, udp, ctx2) if got := newCalls.Load(); got != 1 { t.Fatalf("expected stream reuse by tuple, new stream calls=%d want=1", got) } }