diff --git a/dns.yaml b/dns.yaml index 9a90295..d56d21e 100644 --- a/dns.yaml +++ b/dns.yaml @@ -50,9 +50,10 @@ plugins: username: "admin" password: "szn0s!nw@pwd()" use_tls: false - timeout: 10 - address_list4: "gfw" # 改为 gfw,插件会自动创建这个地址列表 + timeout: 5 # 减少连接超时时间 + address_list4: "gfw" # 改为 gfw,插件会自动创建这个地址列表 mask4: 24 comment: "amazon_domain" timeout_addr: 86400 - cache_ttl: 86400 \ No newline at end of file + cache_ttl: 86400 + verify_add: false # 关闭验证,提升性能 \ No newline at end of file diff --git a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go index 405a14d..d9d9b11 100644 --- a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go +++ b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go @@ -248,7 +248,8 @@ func (p *mikrotikAddressListPlugin) Exec(_ context.Context, qCtx *query_context. // 检查连接是否正常 if p.conn == nil { p.log.Error("MikroTik connection is nil") - return fmt.Errorf("mikrotik_addresslist: connection is nil") + // 不返回错误,避免影响DNS响应 + return nil } r := qCtx.R() @@ -263,10 +264,13 @@ func (p *mikrotikAddressListPlugin) Exec(_ context.Context, qCtx *query_context. zap.String("qname", domain), zap.Int("answer_count", len(r.Answer))) - if err := p.addToAddressList(r, domain); err != nil { - p.log.Error("failed to add addresses to MikroTik", zap.Error(err)) - return fmt.Errorf("mikrotik_addresslist: %w", err) - } + // 异步处理Mikrotik操作,不阻塞DNS响应 + go func(response *dns.Msg, domainName string) { + if err := p.addToAddressList(response, domainName); err != nil { + p.log.Error("failed to add addresses to MikroTik", zap.Error(err)) + // 不返回错误到主流程,避免影响DNS响应 + } + }(r, domain) } return nil } @@ -337,21 +341,31 @@ func (p *mikrotikAddressListPlugin) addToAddressList(r *dns.Msg, domain string) return nil } - // 动态调整工作池大小 - p.adjustWorkerPoolSize(len(addresses)) + // 立即记录并启动异步处理,不等待任何操作 + p.log.Debug("queuing addresses for async processing", + zap.Int("address_count", len(addresses)), + zap.String("domain", domain)) - // 使用优化的批量处理 - if err := p.batchAddAddresses(addresses, p.args.AddressList4, p.args.Mask4, domain); err != nil { - return err - } + // 异步处理所有地址,包括工作池调整和批量处理 + go func(addrs []netip.Addr, listName string, mask int, domainName string) { + // 在异步线程中调整工作池大小 + p.adjustWorkerPoolSize(len(addrs)) - // 记录缓存统计信息 - total, valid := p.getCacheStats() - p.log.Info("IPv4 addresses processed", - zap.Int("processed_count", len(addresses)), - zap.Int("cache_total", total), - zap.Int("cache_valid", valid)) + // 启动批量处理 + if err := p.batchAddAddresses(addrs, listName, mask, domainName); err != nil { + p.log.Error("async batch processing failed", zap.Error(err)) + } + // 记录缓存统计信息 + total, valid := p.getCacheStats() + p.log.Debug("async processing stats", + zap.Int("processed_count", len(addrs)), + zap.Int("cache_total", total), + zap.Int("cache_valid", valid), + zap.String("domain", domainName)) + }(addresses, p.args.AddressList4, p.args.Mask4, domain) + + // 立即返回,不等待任何异步操作 return nil } @@ -381,17 +395,9 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa return nil } - // 缓存中没有,检查 MikroTik 中是否已存在 - exists, err := p.addressExists(listName, cidrAddr) - if err != nil { - // 如果检查失败,可能是地址列表不存在,继续尝试添加 - p.log.Debug("failed to check if address exists in MikroTik, will try to add anyway", zap.Error(err)) - } else if exists { - // 地址已存在于 MikroTik 中,添加到缓存并跳过 - p.log.Debug("address already exists in MikroTik, adding to cache", zap.String("cidr", cidrAddr), zap.String("list", listName)) - p.addToCache(listName, cidrAddr) - return nil - } + // 跳过耗时的 MikroTik 存在性检查,直接尝试添加 + // 依赖 Mikrotik 的内置重复检查和错误处理 + p.log.Debug("skipping existence check, will attempt direct add", zap.String("cidr", cidrAddr), zap.String("list", listName)) // 构造 RouterOS 参数,注意必须以 = 开头! params := []string{ @@ -422,9 +428,10 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa p.log.Debug("Add to list: ", zap.Strings("params", params)) - // 发送到 RouterOS,带智能重试机制 - maxRetries := 3 - backoffDuration := 100 * time.Millisecond + // 发送到 RouterOS,优化重试机制以减少延迟 + maxRetries := 2 // 减少重试次数 + backoffDuration := 50 * time.Millisecond // 减少退避时间 + var err error // 声明 err 变量 for i := 0; i < maxRetries; i++ { // 使用读锁保护连接访问 @@ -441,7 +448,7 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa p.mu.Unlock() p.log.Error("failed to reconnect", zap.Error(err)) time.Sleep(backoffDuration) - backoffDuration *= 2 // 指数退避 + backoffDuration += 25 * time.Millisecond // 线性增加,减少总延迟 continue } conn = p.conn @@ -470,7 +477,7 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa // 指数退避 time.Sleep(backoffDuration) - backoffDuration *= 2 + backoffDuration += 25 * time.Millisecond // 线性增加,减少总延迟 continue } @@ -541,7 +548,7 @@ func (p *mikrotikAddressListPlugin) addressExists(listName, address string) (boo return exists, nil } -// batchAddAddresses 批量添加地址到MikroTik(批量操作优化) +// batchAddAddresses 批量添加地址到MikroTik(完全异步化) func (p *mikrotikAddressListPlugin) batchAddAddresses(addresses []netip.Addr, listName string, mask int, domain string) error { if len(addresses) == 0 { return nil @@ -549,10 +556,13 @@ func (p *mikrotikAddressListPlugin) batchAddAddresses(addresses []netip.Addr, li // 分批处理,每批10个地址 batchSize := 10 - var wg sync.WaitGroup - var mu sync.Mutex - var errors []error - successCount := 0 + totalBatches := (len(addresses) + batchSize - 1) / batchSize + + p.log.Info("starting async batch processing", + zap.Int("total_addresses", len(addresses)), + zap.Int("batch_size", batchSize), + zap.Int("total_batches", totalBatches), + zap.String("domain", domain)) for i := 0; i < len(addresses); i += batchSize { end := i + batchSize @@ -561,46 +571,46 @@ func (p *mikrotikAddressListPlugin) batchAddAddresses(addresses []netip.Addr, li } batch := addresses[i:end] - wg.Add(1) - - go func(batch []netip.Addr) { - defer wg.Done() + batchIndex := i/batchSize + 1 + // 异步处理每个批次,不等待完成 + go func(batch []netip.Addr, batchIdx int) { // 获取工作池槽位 select { case p.workerPool <- struct{}{}: defer func() { <-p.workerPool }() default: - p.log.Debug("worker pool full, processing batch directly") + p.log.Debug("worker pool full, processing batch directly", + zap.Int("batch", batchIdx)) } + successCount := 0 + errorCount := 0 + for _, addr := range batch { if err := p.addAddressToMikrotik(addr, listName, mask, domain); err != nil { - mu.Lock() - errors = append(errors, err) - mu.Unlock() + errorCount++ + p.log.Debug("failed to add address in batch", + zap.String("addr", addr.String()), + zap.Int("batch", batchIdx), + zap.Error(err)) } else { - mu.Lock() successCount++ - mu.Unlock() } } - }(batch) + + p.log.Debug("batch processing completed", + zap.Int("batch", batchIdx), + zap.Int("success_count", successCount), + zap.Int("error_count", errorCount), + zap.String("domain", domain)) + }(batch, batchIndex) } - wg.Wait() - - if len(errors) > 0 { - p.log.Error("batch processing completed with errors", - zap.Int("success_count", successCount), - zap.Int("error_count", len(errors)), - zap.Error(errors[0])) - return errors[0] - } - - p.log.Info("batch processing completed successfully", - zap.Int("success_count", successCount), - zap.Int("total_count", len(addresses))) + // 立即返回,不等待批次处理完成 + p.log.Debug("all batches queued for async processing", + zap.Int("total_addresses", len(addresses)), + zap.String("domain", domain)) return nil }