Files
Mellaris/app.go
T
hayzam 7a3f6e945d Improves flow handling and adds runtime stats APIs
Refactors TCP and UDP flow managers to enhance analyzer selection and flow binding accuracy, including O(1) UDP stream rebinding by 5-tuple.
Introduces runtime stats tracking for engine and ruleset operations, exposing new APIs for granular performance and error metrics.
Optimizes GeoMatcher with result caching and supports efficient geosite set matching, reducing redundant computation in ruleset expressions.
2026-05-13 06:10:38 +05:30

227 lines
6.3 KiB
Go

package mellaris
import (
"context"
"errors"
"fmt"
"runtime"
"git.difuse.io/Difuse/Mellaris/analyzer"
"git.difuse.io/Difuse/Mellaris/engine"
gfwio "git.difuse.io/Difuse/Mellaris/io"
"git.difuse.io/Difuse/Mellaris/modifier"
"git.difuse.io/Difuse/Mellaris/ruleset"
)
// App owns the Mellaris engine and ruleset lifecycle.
type App struct {
engine engine.Engine
io gfwio.PacketIO
rulesetConfig *ruleset.BuiltinConfig
ruleset ruleset.Ruleset
analyzers []analyzer.Analyzer
modifiers []modifier.Modifier
rulesFile string
}
// New builds an App from config and options.
func New(cfg Config, opts Options) (*App, error) {
rules, rulesFile, err := resolveRules(opts)
if err != nil {
return nil, err
}
analyzers := normalizeAnalyzers(opts.Analyzers)
modifiers := normalizeModifiers(opts.Modifiers)
engineLogger := opts.EngineLogger
if engineLogger == nil {
engineLogger = noopEngineLogger{}
}
rulesetLogger := opts.RulesetLogger
if rulesetLogger == nil {
rulesetLogger = noopRulesetLogger{}
}
packetIO := cfg.IO.PacketIO
ownsIO := false
workerCount := effectiveWorkerCount(cfg.Workers.Count)
numQueues := cfg.IO.NumQueues
if numQueues <= 0 {
numQueues = workerCount
}
if packetIO == nil {
packetIO, err = gfwio.NewNFQueuePacketIO(gfwio.NFQueuePacketIOConfig{
QueueSize: cfg.IO.QueueSize,
ReadBuffer: cfg.IO.ReadBuffer,
WriteBuffer: cfg.IO.WriteBuffer,
Local: cfg.IO.Local,
RST: cfg.IO.RST,
NumQueues: numQueues,
MaxPacketLen: cfg.IO.MaxPacketLen,
})
if err != nil {
return nil, ConfigError{Field: "io", Err: err}
}
ownsIO = true
}
cleanup := func() {
if ownsIO {
_ = packetIO.Close()
}
}
rsConfig := &ruleset.BuiltinConfig{
Logger: rulesetLogger,
GeoSiteFilename: cfg.Ruleset.GeoSite,
GeoIpFilename: cfg.Ruleset.GeoIp,
ProtectedDialContext: packetIO.ProtectedDialContext,
}
rs, err := ruleset.CompileExprRules(rules, analyzers, modifiers, rsConfig)
if err != nil {
cleanup()
return nil, err
}
engCfg := engine.Config{
Logger: engineLogger,
IO: packetIO,
Ruleset: rs,
Workers: workerCount,
WorkerQueueSize: cfg.Workers.QueueSize,
WorkerTCPMaxBufferedPagesTotal: cfg.Workers.TCPMaxBufferedPagesTotal,
WorkerTCPMaxBufferedPagesPerConn: cfg.Workers.TCPMaxBufferedPagesPerConn,
WorkerUDPMaxStreams: cfg.Workers.UDPMaxStreams,
OverflowPolicy: cfg.Workers.OverflowPolicy,
AnalyzerSelectionMode: cfg.Workers.AnalyzerSelectionMode,
}
eng, err := engine.NewEngine(engCfg)
if err != nil {
cleanup()
return nil, err
}
return &App{
engine: eng,
io: packetIO,
rulesetConfig: rsConfig,
ruleset: rs,
analyzers: analyzers,
modifiers: modifiers,
rulesFile: rulesFile,
}, nil
}
// Run starts the engine and blocks until it exits or ctx is cancelled.
func (a *App) Run(ctx context.Context) error {
return a.engine.Run(ctx)
}
// Close releases the underlying PacketIO.
func (a *App) Close() error {
if a == nil || a.io == nil {
return nil
}
return a.io.Close()
}
// ReloadRules reloads rules from the configured rules file.
func (a *App) ReloadRules() error {
if a.rulesFile == "" {
return ConfigError{Field: "rules", Err: errors.New("rules file not set")}
}
rules, err := ruleset.ExprRulesFromYAML(a.rulesFile)
if err != nil {
return fmt.Errorf("load rules file %q: %w", a.rulesFile, err)
}
return a.UpdateRules(rules)
}
// UpdateRules compiles the provided rules and updates the running engine.
func (a *App) UpdateRules(rules []ruleset.ExprRule) error {
rs, err := ruleset.CompileExprRules(rules, a.analyzers, a.modifiers, a.rulesetConfig)
if err != nil {
return err
}
return a.engine.UpdateRuleset(rs)
}
// Engine returns the underlying engine instance.
func (a *App) Engine() engine.Engine {
return a.engine
}
func effectiveWorkerCount(configured int) int {
if configured > 0 {
return configured
}
n := runtime.GOMAXPROCS(0)
if n <= 0 {
return 1
}
return n
}
func resolveRules(opts Options) ([]ruleset.ExprRule, string, error) {
if opts.RulesFile != "" && len(opts.Rules) > 0 {
return nil, "", ConfigError{Field: "rules", Err: errors.New("use either RulesFile or Rules")}
}
if opts.RulesFile != "" {
rules, err := ruleset.ExprRulesFromYAML(opts.RulesFile)
if err != nil {
return nil, opts.RulesFile, fmt.Errorf("load rules file %q: %w", opts.RulesFile, err)
}
return rules, opts.RulesFile, nil
}
if len(opts.Rules) > 0 {
return opts.Rules, "", nil
}
return nil, "", ConfigError{Field: "rules", Err: errors.New("no rules provided")}
}
func normalizeAnalyzers(in []analyzer.Analyzer) []analyzer.Analyzer {
if in == nil {
return DefaultAnalyzers()
}
out := make([]analyzer.Analyzer, len(in))
copy(out, in)
return out
}
func normalizeModifiers(in []modifier.Modifier) []modifier.Modifier {
if in == nil {
return DefaultModifiers()
}
out := make([]modifier.Modifier, len(in))
copy(out, in)
return out
}
type noopEngineLogger struct{}
func (noopEngineLogger) WorkerStart(id int) {}
func (noopEngineLogger) WorkerStop(id int) {}
func (noopEngineLogger) TCPStreamNew(workerID int, info ruleset.StreamInfo) {}
func (noopEngineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) {}
func (noopEngineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
}
func (noopEngineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {}
func (noopEngineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) {}
func (noopEngineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) {
}
func (noopEngineLogger) ModifyError(info ruleset.StreamInfo, err error) {}
func (noopEngineLogger) AnalyzerDebugf(streamID int64, name string, format string, args ...interface{}) {
}
func (noopEngineLogger) AnalyzerInfof(streamID int64, name string, format string, args ...interface{}) {
}
func (noopEngineLogger) AnalyzerErrorf(streamID int64, name string, format string, args ...interface{}) {
}
type noopRulesetLogger struct{}
func (noopRulesetLogger) Log(info ruleset.StreamInfo, name string) {}
func (noopRulesetLogger) MatchError(info ruleset.StreamInfo, name string, err error) {}