7a3f6e945d
Refactors TCP and UDP flow managers to enhance analyzer selection and flow binding accuracy, including O(1) UDP stream rebinding by 5-tuple. Introduces runtime stats tracking for engine and ruleset operations, exposing new APIs for granular performance and error metrics. Optimizes GeoMatcher with result caching and supports efficient geosite set matching, reducing redundant computation in ruleset expressions.
227 lines
6.3 KiB
Go
227 lines
6.3 KiB
Go
package mellaris
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"runtime"
|
|
|
|
"git.difuse.io/Difuse/Mellaris/analyzer"
|
|
"git.difuse.io/Difuse/Mellaris/engine"
|
|
gfwio "git.difuse.io/Difuse/Mellaris/io"
|
|
"git.difuse.io/Difuse/Mellaris/modifier"
|
|
"git.difuse.io/Difuse/Mellaris/ruleset"
|
|
)
|
|
|
|
// App owns the Mellaris engine and ruleset lifecycle.
|
|
type App struct {
|
|
engine engine.Engine
|
|
io gfwio.PacketIO
|
|
rulesetConfig *ruleset.BuiltinConfig
|
|
ruleset ruleset.Ruleset
|
|
analyzers []analyzer.Analyzer
|
|
modifiers []modifier.Modifier
|
|
rulesFile string
|
|
}
|
|
|
|
// New builds an App from config and options.
|
|
func New(cfg Config, opts Options) (*App, error) {
|
|
rules, rulesFile, err := resolveRules(opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
analyzers := normalizeAnalyzers(opts.Analyzers)
|
|
modifiers := normalizeModifiers(opts.Modifiers)
|
|
engineLogger := opts.EngineLogger
|
|
if engineLogger == nil {
|
|
engineLogger = noopEngineLogger{}
|
|
}
|
|
rulesetLogger := opts.RulesetLogger
|
|
if rulesetLogger == nil {
|
|
rulesetLogger = noopRulesetLogger{}
|
|
}
|
|
|
|
packetIO := cfg.IO.PacketIO
|
|
ownsIO := false
|
|
workerCount := effectiveWorkerCount(cfg.Workers.Count)
|
|
numQueues := cfg.IO.NumQueues
|
|
if numQueues <= 0 {
|
|
numQueues = workerCount
|
|
}
|
|
if packetIO == nil {
|
|
packetIO, err = gfwio.NewNFQueuePacketIO(gfwio.NFQueuePacketIOConfig{
|
|
QueueSize: cfg.IO.QueueSize,
|
|
ReadBuffer: cfg.IO.ReadBuffer,
|
|
WriteBuffer: cfg.IO.WriteBuffer,
|
|
Local: cfg.IO.Local,
|
|
RST: cfg.IO.RST,
|
|
NumQueues: numQueues,
|
|
MaxPacketLen: cfg.IO.MaxPacketLen,
|
|
})
|
|
if err != nil {
|
|
return nil, ConfigError{Field: "io", Err: err}
|
|
}
|
|
ownsIO = true
|
|
}
|
|
cleanup := func() {
|
|
if ownsIO {
|
|
_ = packetIO.Close()
|
|
}
|
|
}
|
|
|
|
rsConfig := &ruleset.BuiltinConfig{
|
|
Logger: rulesetLogger,
|
|
GeoSiteFilename: cfg.Ruleset.GeoSite,
|
|
GeoIpFilename: cfg.Ruleset.GeoIp,
|
|
ProtectedDialContext: packetIO.ProtectedDialContext,
|
|
}
|
|
rs, err := ruleset.CompileExprRules(rules, analyzers, modifiers, rsConfig)
|
|
if err != nil {
|
|
cleanup()
|
|
return nil, err
|
|
}
|
|
|
|
engCfg := engine.Config{
|
|
Logger: engineLogger,
|
|
IO: packetIO,
|
|
Ruleset: rs,
|
|
Workers: workerCount,
|
|
WorkerQueueSize: cfg.Workers.QueueSize,
|
|
WorkerTCPMaxBufferedPagesTotal: cfg.Workers.TCPMaxBufferedPagesTotal,
|
|
WorkerTCPMaxBufferedPagesPerConn: cfg.Workers.TCPMaxBufferedPagesPerConn,
|
|
WorkerUDPMaxStreams: cfg.Workers.UDPMaxStreams,
|
|
OverflowPolicy: cfg.Workers.OverflowPolicy,
|
|
AnalyzerSelectionMode: cfg.Workers.AnalyzerSelectionMode,
|
|
}
|
|
eng, err := engine.NewEngine(engCfg)
|
|
if err != nil {
|
|
cleanup()
|
|
return nil, err
|
|
}
|
|
|
|
return &App{
|
|
engine: eng,
|
|
io: packetIO,
|
|
rulesetConfig: rsConfig,
|
|
ruleset: rs,
|
|
analyzers: analyzers,
|
|
modifiers: modifiers,
|
|
rulesFile: rulesFile,
|
|
}, nil
|
|
}
|
|
|
|
// Run starts the engine and blocks until it exits or ctx is cancelled.
|
|
func (a *App) Run(ctx context.Context) error {
|
|
return a.engine.Run(ctx)
|
|
}
|
|
|
|
// Close releases the underlying PacketIO.
|
|
func (a *App) Close() error {
|
|
if a == nil || a.io == nil {
|
|
return nil
|
|
}
|
|
return a.io.Close()
|
|
}
|
|
|
|
// ReloadRules reloads rules from the configured rules file.
|
|
func (a *App) ReloadRules() error {
|
|
if a.rulesFile == "" {
|
|
return ConfigError{Field: "rules", Err: errors.New("rules file not set")}
|
|
}
|
|
rules, err := ruleset.ExprRulesFromYAML(a.rulesFile)
|
|
if err != nil {
|
|
return fmt.Errorf("load rules file %q: %w", a.rulesFile, err)
|
|
}
|
|
return a.UpdateRules(rules)
|
|
}
|
|
|
|
// UpdateRules compiles the provided rules and updates the running engine.
|
|
func (a *App) UpdateRules(rules []ruleset.ExprRule) error {
|
|
rs, err := ruleset.CompileExprRules(rules, a.analyzers, a.modifiers, a.rulesetConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return a.engine.UpdateRuleset(rs)
|
|
}
|
|
|
|
// Engine returns the underlying engine instance.
|
|
func (a *App) Engine() engine.Engine {
|
|
return a.engine
|
|
}
|
|
|
|
func effectiveWorkerCount(configured int) int {
|
|
if configured > 0 {
|
|
return configured
|
|
}
|
|
n := runtime.GOMAXPROCS(0)
|
|
if n <= 0 {
|
|
return 1
|
|
}
|
|
return n
|
|
}
|
|
|
|
func resolveRules(opts Options) ([]ruleset.ExprRule, string, error) {
|
|
if opts.RulesFile != "" && len(opts.Rules) > 0 {
|
|
return nil, "", ConfigError{Field: "rules", Err: errors.New("use either RulesFile or Rules")}
|
|
}
|
|
if opts.RulesFile != "" {
|
|
rules, err := ruleset.ExprRulesFromYAML(opts.RulesFile)
|
|
if err != nil {
|
|
return nil, opts.RulesFile, fmt.Errorf("load rules file %q: %w", opts.RulesFile, err)
|
|
}
|
|
return rules, opts.RulesFile, nil
|
|
}
|
|
if len(opts.Rules) > 0 {
|
|
return opts.Rules, "", nil
|
|
}
|
|
return nil, "", ConfigError{Field: "rules", Err: errors.New("no rules provided")}
|
|
}
|
|
|
|
func normalizeAnalyzers(in []analyzer.Analyzer) []analyzer.Analyzer {
|
|
if in == nil {
|
|
return DefaultAnalyzers()
|
|
}
|
|
out := make([]analyzer.Analyzer, len(in))
|
|
copy(out, in)
|
|
return out
|
|
}
|
|
|
|
func normalizeModifiers(in []modifier.Modifier) []modifier.Modifier {
|
|
if in == nil {
|
|
return DefaultModifiers()
|
|
}
|
|
out := make([]modifier.Modifier, len(in))
|
|
copy(out, in)
|
|
return out
|
|
}
|
|
|
|
type noopEngineLogger struct{}
|
|
|
|
func (noopEngineLogger) WorkerStart(id int) {}
|
|
func (noopEngineLogger) WorkerStop(id int) {}
|
|
|
|
func (noopEngineLogger) TCPStreamNew(workerID int, info ruleset.StreamInfo) {}
|
|
func (noopEngineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) {}
|
|
func (noopEngineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
|
}
|
|
|
|
func (noopEngineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {}
|
|
func (noopEngineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) {}
|
|
func (noopEngineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
|
|
}
|
|
|
|
func (noopEngineLogger) ModifyError(info ruleset.StreamInfo, err error) {}
|
|
|
|
func (noopEngineLogger) AnalyzerDebugf(streamID int64, name string, format string, args ...interface{}) {
|
|
}
|
|
func (noopEngineLogger) AnalyzerInfof(streamID int64, name string, format string, args ...interface{}) {
|
|
}
|
|
func (noopEngineLogger) AnalyzerErrorf(streamID int64, name string, format string, args ...interface{}) {
|
|
}
|
|
|
|
type noopRulesetLogger struct{}
|
|
|
|
func (noopRulesetLogger) Log(info ruleset.StreamInfo, name string) {}
|
|
func (noopRulesetLogger) MatchError(info ruleset.StreamInfo, name string, err error) {}
|