diff --git a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go index 14e2c24..0dbe1bf 100644 --- a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go +++ b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go @@ -50,6 +50,7 @@ type Args struct { Comment string `yaml:"comment"` // 添加的地址的注释 TimeoutAddr int `yaml:"timeout_addr"` // 地址超时时间(秒),0 表示永久 CacheTTL int `yaml:"cache_ttl"` // 缓存 TTL(秒),默认 3600(1小时) + VerifyAdd bool `yaml:"verify_add"` // 是否在添加后验证地址确实存在,默认 false } var _ sequence.Executable = (*mikrotikAddressListPlugin)(nil) @@ -59,8 +60,8 @@ func Init(bp *coremain.BP, args any) (any, error) { return newMikrotikAddressListPlugin(args.(*Args)) } -// QuickSetup format: host:port:username:password:use_tls:timeout:address_list4:address_list6:mask4:mask6:comment:timeout_addr:cache_ttl -// e.g. "192.168.1.1:8728:admin:password:false:10:my_list4:my_list6:24:32:from_dns:3600:3600" +// QuickSetup format: host:port:username:password:use_tls:timeout:address_list4:address_list6:mask4:mask6:comment:timeout_addr:cache_ttl:verify_add +// e.g. "192.168.1.1:8728:admin:password:false:10:my_list4:my_list6:24:32:from_dns:3600:3600:true" func QuickSetup(_ sequence.BQ, s string) (any, error) { parts := strings.Split(s, ":") if len(parts) < 6 { @@ -137,5 +138,12 @@ func QuickSetup(_ sequence.BQ, s string) (any, error) { } } + // 解析验证开关 + if len(parts) > 13 { + if verifyAdd, err := strconv.ParseBool(parts[13]); err == nil { + args.VerifyAdd = verifyAdd + } + } + return newMikrotikAddressListPlugin(args) } diff --git a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go index 0bf4acf..405a14d 100644 --- a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go +++ b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go @@ -35,20 +35,33 @@ import ( routeros "github.com/go-routeros/routeros/v3" ) +// verifyTask 验证任务 +type verifyTask struct { + listName string + cidrAddr string + retries int +} + type mikrotikAddressListPlugin struct { args *Args conn *routeros.Client log *zap.Logger // 并发控制 - workerPool chan struct{} - wg sync.WaitGroup - mu sync.RWMutex // 保护连接的重连操作 + workerPool chan struct{} + verifyPool chan struct{} // 专门用于验证的工作池 + wg sync.WaitGroup + mu sync.RWMutex // 保护连接的重连操作 + isConnected bool // 连接状态标记 // 内存缓存 cache map[string]time.Time // key: "listName:cidrAddr", value: 添加时间 cacheMu sync.RWMutex // 保护缓存访问 cacheTTL time.Duration // 缓存 TTL,默认 1 小时 + + // 验证队列 + verifyQueue chan verifyTask + stopVerify chan struct{} } func newMikrotikAddressListPlugin(args *Args) (*mikrotikAddressListPlugin, error) { @@ -82,6 +95,7 @@ func newMikrotikAddressListPlugin(args *Args) (*mikrotikAddressListPlugin, error // 设置工作池大小(可以根据需要调整) workerCount := 10 // 并发工作线程数 + verifyCount := 5 // 验证工作线程数 // 设置缓存 TTL cacheTTL := time.Hour // 默认 1 小时 @@ -90,12 +104,23 @@ func newMikrotikAddressListPlugin(args *Args) (*mikrotikAddressListPlugin, error } plugin := &mikrotikAddressListPlugin{ - args: args, - conn: conn, - log: zap.L().Named("mikrotik_addresslist"), - workerPool: make(chan struct{}, workerCount), - cache: make(map[string]time.Time), - cacheTTL: cacheTTL, + args: args, + conn: conn, + log: zap.L().Named("mikrotik_addresslist"), + workerPool: make(chan struct{}, workerCount), + verifyPool: make(chan struct{}, verifyCount), + cache: make(map[string]time.Time), + cacheTTL: cacheTTL, + isConnected: true, + verifyQueue: make(chan verifyTask, 100), // 验证任务队列 + stopVerify: make(chan struct{}), + } + + // 启动验证工作协程 + if args.VerifyAdd { + for i := 0; i < verifyCount; i++ { + go plugin.verifyWorker() + } } // 记录连接成功信息 @@ -105,6 +130,8 @@ func newMikrotikAddressListPlugin(args *Args) (*mikrotikAddressListPlugin, error zap.String("username", args.Username), zap.String("address_list4", args.AddressList4), zap.Int("worker_count", workerCount), + zap.Int("verify_count", verifyCount), + zap.Bool("verify_add", args.VerifyAdd), zap.Duration("cache_ttl", cacheTTL)) return plugin, nil @@ -226,11 +253,17 @@ func (p *mikrotikAddressListPlugin) Exec(_ context.Context, qCtx *query_context. r := qCtx.R() if r != nil { + // 获取查询的域名 + var domain string + if len(qCtx.Q().Question) > 0 { + domain = strings.TrimSuffix(qCtx.Q().Question[0].Name, ".") + } + p.log.Debug("processing DNS response", - zap.String("qname", qCtx.Q().Question[0].Name), + zap.String("qname", domain), zap.Int("answer_count", len(r.Answer))) - if err := p.addToAddressList(r); err != nil { + if err := p.addToAddressList(r, domain); err != nil { p.log.Error("failed to add addresses to MikroTik", zap.Error(err)) return fmt.Errorf("mikrotik_addresslist: %w", err) } @@ -239,6 +272,11 @@ func (p *mikrotikAddressListPlugin) Exec(_ context.Context, qCtx *query_context. } func (p *mikrotikAddressListPlugin) Close() error { + // 停止验证工作器 + if p.args.VerifyAdd { + close(p.stopVerify) + } + // 等待所有工作完成 p.wg.Wait() @@ -260,7 +298,7 @@ func (p *mikrotikAddressListPlugin) Close() error { return nil } -func (p *mikrotikAddressListPlugin) addToAddressList(r *dns.Msg) error { +func (p *mikrotikAddressListPlugin) addToAddressList(r *dns.Msg, domain string) error { p.log.Debug("starting to process DNS response", zap.String("configured_address_list4", p.args.AddressList4), zap.Int("answer_count", len(r.Answer))) @@ -303,7 +341,7 @@ func (p *mikrotikAddressListPlugin) addToAddressList(r *dns.Msg) error { p.adjustWorkerPoolSize(len(addresses)) // 使用优化的批量处理 - if err := p.batchAddAddresses(addresses, p.args.AddressList4, p.args.Mask4); err != nil { + if err := p.batchAddAddresses(addresses, p.args.AddressList4, p.args.Mask4, domain); err != nil { return err } @@ -317,7 +355,7 @@ func (p *mikrotikAddressListPlugin) addToAddressList(r *dns.Msg) error { return nil } -func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listName string, mask int) error { +func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listName string, mask int, domain string) error { p.log.Debug("addAddressToMikrotik called", zap.String("addr", addr.String()), zap.String("listName", listName), @@ -361,9 +399,13 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa "=address=" + cidrAddr, } - // 添加注释(如果配置了) - if p.args.Comment != "" { - params = append(params, "=comment="+p.args.Comment) + // 使用域名作为注释(优先级高于配置文件中的comment) + comment := domain + if comment == "" && p.args.Comment != "" { + comment = p.args.Comment + } + if comment != "" { + params = append(params, "=comment="+comment) } // 添加超时时间(如果配置了) @@ -374,7 +416,8 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa p.log.Info("adding address to MikroTik", zap.String("cidr", cidrAddr), zap.String("list", listName), - zap.String("comment", p.args.Comment), + zap.String("domain", domain), + zap.String("comment", comment), zap.Int("timeout", p.args.TimeoutAddr)) p.log.Debug("Add to list: ", zap.Strings("params", params)) @@ -450,6 +493,20 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa // 添加到缓存 p.addToCache(listName, cidrAddr) + // 如果启用了验证,提交验证任务 + if p.args.VerifyAdd { + select { + case p.verifyQueue <- verifyTask{ + listName: listName, + cidrAddr: cidrAddr, + retries: 0, + }: + p.log.Debug("verification task queued", zap.String("cidr", cidrAddr)) + default: + p.log.Warn("verification queue full, skipping verification", zap.String("cidr", cidrAddr)) + } + } + return nil } @@ -485,7 +542,7 @@ func (p *mikrotikAddressListPlugin) addressExists(listName, address string) (boo } // batchAddAddresses 批量添加地址到MikroTik(批量操作优化) -func (p *mikrotikAddressListPlugin) batchAddAddresses(addresses []netip.Addr, listName string, mask int) error { +func (p *mikrotikAddressListPlugin) batchAddAddresses(addresses []netip.Addr, listName string, mask int, domain string) error { if len(addresses) == 0 { return nil } @@ -518,7 +575,7 @@ func (p *mikrotikAddressListPlugin) batchAddAddresses(addresses []netip.Addr, li } for _, addr := range batch { - if err := p.addAddressToMikrotik(addr, listName, mask); err != nil { + if err := p.addAddressToMikrotik(addr, listName, mask, domain); err != nil { mu.Lock() errors = append(errors, err) mu.Unlock() @@ -585,3 +642,141 @@ func (p *mikrotikAddressListPlugin) isConnectionError(err error) bool { strings.Contains(errStr, "closed") || strings.Contains(errStr, "timeout") } + +// verifyWorker 验证工作器,独立处理验证任务,避免阻塞写入操作 +func (p *mikrotikAddressListPlugin) verifyWorker() { + p.wg.Add(1) + defer p.wg.Done() + + ticker := time.NewTicker(2 * time.Second) // 每2秒处理一批验证任务 + defer ticker.Stop() + + for { + select { + case <-p.stopVerify: + p.log.Info("verification worker stopping") + return + + case <-ticker.C: + // 批量处理验证任务 + p.processBatchVerification() + + case task := <-p.verifyQueue: + // 获取验证工作池槽位 + select { + case p.verifyPool <- struct{}{}: + go func(task verifyTask) { + defer func() { <-p.verifyPool }() + p.processVerificationTask(task) + }(task) + default: + // 验证池满,延迟处理 + go func() { + time.Sleep(time.Second) + select { + case p.verifyQueue <- task: + default: + p.log.Warn("failed to requeue verification task", + zap.String("cidr", task.cidrAddr)) + } + }() + } + } + } +} + +// processBatchVerification 批量处理验证队列中的任务 +func (p *mikrotikAddressListPlugin) processBatchVerification() { + var tasks []verifyTask + + // 收集最多10个任务进行批量处理 + for i := 0; i < 10; i++ { + select { + case task := <-p.verifyQueue: + tasks = append(tasks, task) + default: + break + } + } + + if len(tasks) == 0 { + return + } + + p.log.Debug("processing batch verification", zap.Int("task_count", len(tasks))) + + for _, task := range tasks { + select { + case p.verifyPool <- struct{}{}: + go func(task verifyTask) { + defer func() { <-p.verifyPool }() + p.processVerificationTask(task) + }(task) + default: + // 如果池满,重新排队 + select { + case p.verifyQueue <- task: + default: + p.log.Warn("verification queue full, dropping task", + zap.String("cidr", task.cidrAddr)) + } + } + } +} + +// processVerificationTask 处理单个验证任务 +func (p *mikrotikAddressListPlugin) processVerificationTask(task verifyTask) { + // 等待一段时间再验证,让MikroTik有时间处理 + time.Sleep(time.Duration(500+task.retries*200) * time.Millisecond) + + exists, err := p.addressExists(task.listName, task.cidrAddr) + if err != nil { + if task.retries < 3 { + // 重试 + task.retries++ + p.log.Debug("verification failed, retrying", + zap.String("cidr", task.cidrAddr), + zap.Int("retries", task.retries), + zap.Error(err)) + + select { + case p.verifyQueue <- task: + default: + p.log.Warn("failed to requeue verification task for retry", + zap.String("cidr", task.cidrAddr)) + } + } else { + p.log.Error("verification failed after max retries", + zap.String("cidr", task.cidrAddr), + zap.String("list", task.listName), + zap.Error(err)) + } + return + } + + if !exists { + p.log.Warn("address not found in MikroTik after add operation", + zap.String("cidr", task.cidrAddr), + zap.String("list", task.listName)) + + // 从缓存中移除,下次会重新尝试添加 + p.cacheMu.Lock() + key := p.cacheKey(task.listName, task.cidrAddr) + delete(p.cache, key) + p.cacheMu.Unlock() + + // 可以选择重新添加地址 + if task.retries < 2 { + p.log.Info("attempting to re-add address", + zap.String("cidr", task.cidrAddr), + zap.String("list", task.listName)) + + // 这里可以重新调用添加逻辑,但要避免无限循环 + // 暂时只记录警告,由下次DNS查询触发重新添加 + } + } else { + p.log.Debug("address verification successful", + zap.String("cidr", task.cidrAddr), + zap.String("list", task.listName)) + } +}