package mellaris import ( "context" "errors" "fmt" "git.difuse.io/Difuse/Mellaris/analyzer" "git.difuse.io/Difuse/Mellaris/engine" gfwio "git.difuse.io/Difuse/Mellaris/io" "git.difuse.io/Difuse/Mellaris/modifier" "git.difuse.io/Difuse/Mellaris/ruleset" ) // App owns the Mellaris engine and ruleset lifecycle. type App struct { engine engine.Engine io gfwio.PacketIO rulesetConfig *ruleset.BuiltinConfig analyzers []analyzer.Analyzer modifiers []modifier.Modifier rulesFile string } // New builds an App from config and options. func New(cfg Config, opts Options) (*App, error) { rules, rulesFile, err := resolveRules(opts) if err != nil { return nil, err } analyzers := normalizeAnalyzers(opts.Analyzers) modifiers := normalizeModifiers(opts.Modifiers) engineLogger := opts.EngineLogger if engineLogger == nil { engineLogger = noopEngineLogger{} } rulesetLogger := opts.RulesetLogger if rulesetLogger == nil { rulesetLogger = noopRulesetLogger{} } packetIO := cfg.IO.PacketIO ownsIO := false if packetIO == nil { packetIO, err = gfwio.NewNFQueuePacketIO(gfwio.NFQueuePacketIOConfig{ QueueSize: cfg.IO.QueueSize, ReadBuffer: cfg.IO.ReadBuffer, WriteBuffer: cfg.IO.WriteBuffer, Local: cfg.IO.Local, RST: cfg.IO.RST, }) if err != nil { return nil, ConfigError{Field: "io", Err: err} } ownsIO = true } cleanup := func() { if ownsIO { _ = packetIO.Close() } } rsConfig := &ruleset.BuiltinConfig{ Logger: rulesetLogger, GeoSiteFilename: cfg.Ruleset.GeoSite, GeoIpFilename: cfg.Ruleset.GeoIp, ProtectedDialContext: packetIO.ProtectedDialContext, } rs, err := ruleset.CompileExprRules(rules, analyzers, modifiers, rsConfig) if err != nil { cleanup() return nil, err } engCfg := engine.Config{ Logger: engineLogger, IO: packetIO, Ruleset: rs, Workers: cfg.Workers.Count, WorkerQueueSize: cfg.Workers.QueueSize, WorkerTCPMaxBufferedPagesTotal: cfg.Workers.TCPMaxBufferedPagesTotal, WorkerTCPMaxBufferedPagesPerConn: cfg.Workers.TCPMaxBufferedPagesPerConn, WorkerUDPMaxStreams: cfg.Workers.UDPMaxStreams, } eng, err := engine.NewEngine(engCfg) if err != nil { cleanup() return nil, err } return &App{ engine: eng, io: packetIO, rulesetConfig: rsConfig, analyzers: analyzers, modifiers: modifiers, rulesFile: rulesFile, }, nil } // Run starts the engine and blocks until it exits or ctx is cancelled. func (a *App) Run(ctx context.Context) error { return a.engine.Run(ctx) } // Close releases the underlying PacketIO. func (a *App) Close() error { if a == nil || a.io == nil { return nil } return a.io.Close() } // ReloadRules reloads rules from the configured rules file. func (a *App) ReloadRules() error { if a.rulesFile == "" { return ConfigError{Field: "rules", Err: errors.New("rules file not set")} } rules, err := ruleset.ExprRulesFromYAML(a.rulesFile) if err != nil { return fmt.Errorf("load rules file %q: %w", a.rulesFile, err) } return a.UpdateRules(rules) } // UpdateRules compiles the provided rules and updates the running engine. func (a *App) UpdateRules(rules []ruleset.ExprRule) error { rs, err := ruleset.CompileExprRules(rules, a.analyzers, a.modifiers, a.rulesetConfig) if err != nil { return err } return a.engine.UpdateRuleset(rs) } // Engine returns the underlying engine instance. func (a *App) Engine() engine.Engine { return a.engine } func resolveRules(opts Options) ([]ruleset.ExprRule, string, error) { if opts.RulesFile != "" && len(opts.Rules) > 0 { return nil, "", ConfigError{Field: "rules", Err: errors.New("use either RulesFile or Rules")} } if opts.RulesFile != "" { rules, err := ruleset.ExprRulesFromYAML(opts.RulesFile) if err != nil { return nil, opts.RulesFile, fmt.Errorf("load rules file %q: %w", opts.RulesFile, err) } return rules, opts.RulesFile, nil } if len(opts.Rules) > 0 { return opts.Rules, "", nil } return nil, "", ConfigError{Field: "rules", Err: errors.New("no rules provided")} } func normalizeAnalyzers(in []analyzer.Analyzer) []analyzer.Analyzer { if in == nil { return DefaultAnalyzers() } out := make([]analyzer.Analyzer, len(in)) copy(out, in) return out } func normalizeModifiers(in []modifier.Modifier) []modifier.Modifier { if in == nil { return DefaultModifiers() } out := make([]modifier.Modifier, len(in)) copy(out, in) return out } type noopEngineLogger struct{} func (noopEngineLogger) WorkerStart(id int) {} func (noopEngineLogger) WorkerStop(id int) {} func (noopEngineLogger) TCPStreamNew(workerID int, info ruleset.StreamInfo) {} func (noopEngineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) {} func (noopEngineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { } func (noopEngineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) {} func (noopEngineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) {} func (noopEngineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { } func (noopEngineLogger) ModifyError(info ruleset.StreamInfo, err error) {} func (noopEngineLogger) AnalyzerDebugf(streamID int64, name string, format string, args ...interface{}) { } func (noopEngineLogger) AnalyzerInfof(streamID int64, name string, format string, args ...interface{}) { } func (noopEngineLogger) AnalyzerErrorf(streamID int64, name string, format string, args ...interface{}) { } type noopRulesetLogger struct{} func (noopRulesetLogger) Log(info ruleset.StreamInfo, name string) {} func (noopRulesetLogger) MatchError(info ruleset.StreamInfo, name string, err error) {}