Files
Mellaris/engine/udp_manager_bench_test.go
T
hayzam 7a3f6e945d Improves flow handling and adds runtime stats APIs
Refactors TCP and UDP flow managers to enhance analyzer selection and flow binding accuracy, including O(1) UDP stream rebinding by 5-tuple.
Introduces runtime stats tracking for engine and ruleset operations, exposing new APIs for granular performance and error metrics.
Optimizes GeoMatcher with result caching and supports efficient geosite set matching, reducing redundant computation in ruleset expressions.
2026-05-13 06:10:38 +05:30

123 lines
3.3 KiB
Go

package engine
import (
"net"
"testing"
"git.difuse.io/Difuse/Mellaris/analyzer"
"git.difuse.io/Difuse/Mellaris/ruleset"
"github.com/bwmarrin/snowflake"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
)
type legacyUDPStreamValue struct {
IPFlow gopacket.Flow
UDPFlow gopacket.Flow
}
type emptyRuleset struct{}
func (emptyRuleset) Analyzers(ruleset.StreamInfo) []analyzer.Analyzer { return nil }
func (emptyRuleset) Match(ruleset.StreamInfo) ruleset.MatchResult {
return ruleset.MatchResult{Action: ruleset.ActionMaybe}
}
func benchmarkUDPManager(b *testing.B, churn bool) {
node, err := snowflake.NewNode(0)
if err != nil {
b.Fatalf("create node: %v", err)
}
factory := &udpStreamFactory{WorkerID: 0, Logger: noopTestLogger{}, Node: node, Ruleset: emptyRuleset{}}
mgr, err := newUDPStreamManager(factory, 200000, &statsCounters{})
if err != nil {
b.Fatalf("new manager: %v", err)
}
const flowCount = 20000
flows := make([]gopacket.Flow, flowCount)
udps := make([]*layers.UDP, flowCount)
for i := 0; i < flowCount; i++ {
a := byte(i >> 8)
c := byte(i)
flows[i] = gopacket.NewFlow(layers.EndpointIPv4, net.IPv4(10, a, 0, c).To4(), net.IPv4(172, 16, a, c).To4())
udps[i] = &layers.UDP{
SrcPort: layers.UDPPort(1024 + i%20000),
DstPort: layers.UDPPort(20000 + (i*7)%20000),
BaseLayer: layers.BaseLayer{Payload: []byte{0x01, 0x00, 0x00, 0x00}},
}
}
ctx := &udpContext{Verdict: udpVerdictAccept}
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := i % flowCount
streamID := uint32(idx + 1)
if churn {
streamID = uint32((i % flowCount) + 1 + ((i / flowCount) * flowCount))
}
ctx.Verdict = udpVerdictAccept
ctx.Packet = nil
mgr.MatchWithContext(streamID, flows[idx], udps[idx], ctx)
}
}
func BenchmarkUDPManagerMatchStableStreamID(b *testing.B) {
benchmarkUDPManager(b, false)
}
func BenchmarkUDPManagerMatchStreamIDChurn(b *testing.B) {
benchmarkUDPManager(b, true)
}
func BenchmarkLegacyUDPFallbackScanChurn(b *testing.B) {
const flowCount = 5000
flows := make([]gopacket.Flow, flowCount)
udps := make([]*layers.UDP, flowCount)
for i := 0; i < flowCount; i++ {
a := byte(i >> 8)
c := byte(i)
flows[i] = gopacket.NewFlow(layers.EndpointIPv4, net.IPv4(10, a, 0, c).To4(), net.IPv4(172, 16, a, c).To4())
udps[i] = &layers.UDP{
SrcPort: layers.UDPPort(1024 + i%20000),
DstPort: layers.UDPPort(20000 + (i*7)%20000),
BaseLayer: layers.BaseLayer{Payload: []byte{0x01, 0x00, 0x00, 0x00}},
}
}
streams := make(map[uint32]*legacyUDPStreamValue, flowCount)
keys := make([]uint32, 0, flowCount)
for i := 0; i < flowCount; i++ {
streamID := uint32(i + 1)
streams[streamID] = &legacyUDPStreamValue{
IPFlow: flows[i],
UDPFlow: udps[i].TransportFlow(),
}
keys = append(keys, streamID)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := i % flowCount
streamID := uint32((i % flowCount) + 1 + ((i / flowCount) * flowCount))
if _, ok := streams[streamID]; ok {
continue
}
ipFlow := flows[idx]
udpFlow := udps[idx].TransportFlow()
for _, k := range keys {
v, ok := streams[k]
if !ok || v == nil {
continue
}
if (v.IPFlow == ipFlow && v.UDPFlow == udpFlow) ||
(v.IPFlow == ipFlow.Reverse() && v.UDPFlow == udpFlow.Reverse()) {
delete(streams, k)
streams[streamID] = v
break
}
}
}
}