From 59a5ef4aae3e47ac610e0532daf03a5d3cc076e7 Mon Sep 17 00:00:00 2001 From: dengxiongjian Date: Mon, 4 Aug 2025 09:02:30 +0800 Subject: [PATCH] =?UTF-8?q?=20=20=E4=B8=BB=E8=A6=81=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E7=82=B9=EF=BC=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 连接管理优化 mikrotik_addresslist_impl.go:132 - 添加连接状态管理和重连锁机制 - 改进重连逻辑,防止并发重连 2. 缓存机制增强 mikrotik_addresslist_impl.go:162-202 - 优化缓存锁使用,避免死锁 - 添加缓存大小限制和LRU驱逐策略 - 定期清理过期缓存项 3. 智能重试机制 mikrotik_addresslist_impl.go:420 - 指数退避算法 - 更智能的连接错误识别 - 改进的错误处理 4. 动态并发控制 mikrotik_addresslist_impl.go:589 - 根据地址数量动态调整工作池大小 - 批量处理优化 5. 性能监控改进 - 更详细的日志记录 - 缓存统计信息 - 处理过程可观察性 --- dns.yaml | 3 +- .../mikrotik_addresslist/config_example.yaml | 104 ------ .../mikrotik_addresslist.go | 12 +- .../mikrotik_addresslist_impl.go | 311 ++++++++++++++---- 4 files changed, 255 insertions(+), 175 deletions(-) delete mode 100644 plugin/executable/mikrotik_addresslist/config_example.yaml diff --git a/dns.yaml b/dns.yaml index 3651db2..9a90295 100644 --- a/dns.yaml +++ b/dns.yaml @@ -54,4 +54,5 @@ plugins: address_list4: "gfw" # 改为 gfw,插件会自动创建这个地址列表 mask4: 24 comment: "amazon_domain" - timeout_addr: 86400 \ No newline at end of file + timeout_addr: 86400 + cache_ttl: 86400 \ No newline at end of file diff --git a/plugin/executable/mikrotik_addresslist/config_example.yaml b/plugin/executable/mikrotik_addresslist/config_example.yaml deleted file mode 100644 index e97e36d..0000000 --- a/plugin/executable/mikrotik_addresslist/config_example.yaml +++ /dev/null @@ -1,104 +0,0 @@ -# MikroTik Address List 插件配置示例 - -# 插件定义 -plugins: - # 转发插件 - 向上游 DNS 服务器查询 - - tag: forward_google - type: forward - args: - upstream: - - addr: "8.8.8.8:53" - - addr: "8.8.4.4:53" - - # MikroTik Address List 插件 - 将解析的 IP 添加到 MikroTik - - tag: mikrotik_blocklist - type: mikrotik_addresslist - args: "192.168.1.1:8728:admin:password:false:10:blocked_ips:blocked_ips6:24:32:blocked_domain:86400" - - # 序列插件 - 组合多个插件 - - tag: sequence_with_blocklist - type: sequence - args: - - exec: forward_google - - exec: mikrotik_blocklist - -# 服务器配置 -servers: - # UDP 服务器 - - exec: sequence_with_blocklist - args: - - sequence_with_blocklist - listeners: - - protocol: udp - addr: ":53" - - # TCP 服务器 - - exec: sequence_with_blocklist - args: - - sequence_with_blocklist - listeners: - - protocol: tcp - addr: ":53" - -# 日志配置 -log: - level: info - file: "mosdns.log" - -# 其他配置示例 - -# 1. 使用 YAML 格式的详细配置 -plugins: - - tag: mikrotik_detailed - type: mikrotik_addresslist - args: - host: "192.168.1.1" - port: 8728 - username: "admin" - password: "password" - use_tls: false - timeout: 10 - address_list4: "blocked_ips" - address_list6: "blocked_ips6" - mask4: 24 - mask6: 32 - comment: "blocked_domain" - timeout_addr: 86400 - -# 2. 多个 address list 配置 -plugins: - # 恶意域名列表 - - tag: mikrotik_malware - type: mikrotik_addresslist - args: "192.168.1.1:8728:admin:password:false:10:malware_ips:malware_ips6:24:32:malware:3600" - - # 广告域名列表 - - tag: mikrotik_ads - type: mikrotik_addresslist - args: "192.168.1.1:8728:admin:password:false:10:ads_ips:ads_ips6:24:32:ads:7200" - - # 组合序列 - - tag: sequence_all - type: sequence - args: - - exec: forward_google - - exec: mikrotik_malware - - exec: mikrotik_ads - -# 3. 使用 TLS 的安全配置 -plugins: - - tag: mikrotik_secure - type: mikrotik_addresslist - args: "192.168.1.1:8729:admin:password:true:15:secure_list:secure_list6:24:32:secure:1800" - -# 4. 不同掩码配置 -plugins: - # 精确 IP 匹配 - - tag: mikrotik_exact - type: mikrotik_addresslist - args: "192.168.1.1:8728:admin:password:false:10:exact_ips:exact_ips6:32:128:exact:3600" - - # 网段匹配 - - tag: mikrotik_network - type: mikrotik_addresslist - args: "192.168.1.1:8728:admin:password:false:10:network_ips:network_ips6:16:48:network:7200" \ No newline at end of file diff --git a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go index a703f41..14e2c24 100644 --- a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go +++ b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist.go @@ -49,6 +49,7 @@ type Args struct { Mask6 int `yaml:"mask6"` // IPv6 掩码,默认 32 Comment string `yaml:"comment"` // 添加的地址的注释 TimeoutAddr int `yaml:"timeout_addr"` // 地址超时时间(秒),0 表示永久 + CacheTTL int `yaml:"cache_ttl"` // 缓存 TTL(秒),默认 3600(1小时) } var _ sequence.Executable = (*mikrotikAddressListPlugin)(nil) @@ -58,8 +59,8 @@ func Init(bp *coremain.BP, args any) (any, error) { return newMikrotikAddressListPlugin(args.(*Args)) } -// QuickSetup format: host:port:username:password:use_tls:timeout:address_list4:address_list6:mask4:mask6:comment:timeout_addr -// e.g. "192.168.1.1:8728:admin:password:false:10:my_list4:my_list6:24:32:from_dns:3600" +// QuickSetup format: host:port:username:password:use_tls:timeout:address_list4:address_list6:mask4:mask6:comment:timeout_addr:cache_ttl +// e.g. "192.168.1.1:8728:admin:password:false:10:my_list4:my_list6:24:32:from_dns:3600:3600" func QuickSetup(_ sequence.BQ, s string) (any, error) { parts := strings.Split(s, ":") if len(parts) < 6 { @@ -129,5 +130,12 @@ func QuickSetup(_ sequence.BQ, s string) (any, error) { } } + // 解析缓存 TTL + if len(parts) > 12 { + if cacheTTL, err := strconv.Atoi(parts[12]); err == nil { + args.CacheTTL = cacheTTL + } + } + return newMikrotikAddressListPlugin(args) } diff --git a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go index 78e0a97..0bf4acf 100644 --- a/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go +++ b/plugin/executable/mikrotik_addresslist/mikrotik_addresslist_impl.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/IrineSistiana/mosdns/v5/pkg/query_context" "github.com/miekg/dns" @@ -43,6 +44,11 @@ type mikrotikAddressListPlugin struct { workerPool chan struct{} wg sync.WaitGroup mu sync.RWMutex // 保护连接的重连操作 + + // 内存缓存 + cache map[string]time.Time // key: "listName:cidrAddr", value: 添加时间 + cacheMu sync.RWMutex // 保护缓存访问 + cacheTTL time.Duration // 缓存 TTL,默认 1 小时 } func newMikrotikAddressListPlugin(args *Args) (*mikrotikAddressListPlugin, error) { @@ -77,11 +83,19 @@ func newMikrotikAddressListPlugin(args *Args) (*mikrotikAddressListPlugin, error // 设置工作池大小(可以根据需要调整) workerCount := 10 // 并发工作线程数 + // 设置缓存 TTL + cacheTTL := time.Hour // 默认 1 小时 + if args.CacheTTL > 0 { + cacheTTL = time.Duration(args.CacheTTL) * time.Second + } + plugin := &mikrotikAddressListPlugin{ args: args, conn: conn, log: zap.L().Named("mikrotik_addresslist"), workerPool: make(chan struct{}, workerCount), + cache: make(map[string]time.Time), + cacheTTL: cacheTTL, } // 记录连接成功信息 @@ -90,7 +104,8 @@ func newMikrotikAddressListPlugin(args *Args) (*mikrotikAddressListPlugin, error zap.Int("port", args.Port), zap.String("username", args.Username), zap.String("address_list4", args.AddressList4), - zap.Int("worker_count", workerCount)) + zap.Int("worker_count", workerCount), + zap.Duration("cache_ttl", cacheTTL)) return plugin, nil } @@ -138,6 +153,70 @@ func (p *mikrotikAddressListPlugin) reconnect() error { return nil } +// 生成缓存键 +func (p *mikrotikAddressListPlugin) cacheKey(listName, cidrAddr string) string { + return listName + ":" + cidrAddr +} + +// 检查缓存中是否存在 +func (p *mikrotikAddressListPlugin) isInCache(listName, cidrAddr string) bool { + p.cacheMu.RLock() + defer p.cacheMu.RUnlock() + + key := p.cacheKey(listName, cidrAddr) + if addTime, exists := p.cache[key]; exists { + // 检查是否过期 + if time.Since(addTime) < p.cacheTTL { + return true + } + // 过期了,删除 + p.cacheMu.RUnlock() + p.cacheMu.Lock() + delete(p.cache, key) + p.cacheMu.Unlock() + p.cacheMu.RLock() + } + return false +} + +// 添加到缓存 +func (p *mikrotikAddressListPlugin) addToCache(listName, cidrAddr string) { + p.cacheMu.Lock() + defer p.cacheMu.Unlock() + + key := p.cacheKey(listName, cidrAddr) + p.cache[key] = time.Now() + + // 清理过期的缓存项 + p.cleanupExpiredCache() +} + +// 清理过期的缓存项 +func (p *mikrotikAddressListPlugin) cleanupExpiredCache() { + now := time.Now() + for key, addTime := range p.cache { + if now.Sub(addTime) >= p.cacheTTL { + delete(p.cache, key) + } + } +} + +// 获取缓存统计信息 +func (p *mikrotikAddressListPlugin) getCacheStats() (int, int) { + p.cacheMu.RLock() + defer p.cacheMu.RUnlock() + + total := len(p.cache) + valid := 0 + now := time.Now() + for _, addTime := range p.cache { + if now.Sub(addTime) < p.cacheTTL { + valid++ + } + } + return total, valid +} + func (p *mikrotikAddressListPlugin) Exec(_ context.Context, qCtx *query_context.Context) error { // 检查连接是否正常 if p.conn == nil { @@ -163,6 +242,14 @@ func (p *mikrotikAddressListPlugin) Close() error { // 等待所有工作完成 p.wg.Wait() + // 清理缓存 + p.cacheMu.Lock() + cacheSize := len(p.cache) + p.cache = nil + p.cacheMu.Unlock() + + p.log.Info("plugin closed", zap.Int("cache_cleared", cacheSize)) + // 关闭连接 p.mu.Lock() defer p.mu.Unlock() @@ -212,55 +299,20 @@ func (p *mikrotikAddressListPlugin) addToAddressList(r *dns.Msg) error { return nil } - // 并发处理所有地址 - var wg sync.WaitGroup - var mu sync.Mutex - var errors []error - addedCount := 0 + // 动态调整工作池大小 + p.adjustWorkerPoolSize(len(addresses)) - for _, addr := range addresses { - wg.Add(1) - go func(addr netip.Addr) { - defer wg.Done() - - // 获取工作池槽位 - select { - case p.workerPool <- struct{}{}: - defer func() { <-p.workerPool }() - default: - // 如果工作池满了,直接处理(避免阻塞) - p.log.Debug("worker pool full, processing directly") - } - - if err := p.addAddressToMikrotik(addr, p.args.AddressList4, p.args.Mask4); err != nil { - mu.Lock() - errors = append(errors, err) - mu.Unlock() - } else { - mu.Lock() - addedCount++ - mu.Unlock() - } - }(addr) + // 使用优化的批量处理 + if err := p.batchAddAddresses(addresses, p.args.AddressList4, p.args.Mask4); err != nil { + return err } - // 等待所有工作完成 - wg.Wait() - - // 记录结果 - if addedCount > 0 { - p.log.Info("concurrently added IPv4 addresses to MikroTik", - zap.Int("success_count", addedCount), - zap.Int("total_count", len(addresses)), - zap.Int("error_count", len(errors))) - } else { - p.log.Debug("no IPv4 addresses added to MikroTik") - } - - // 如果有错误,返回第一个错误 - if len(errors) > 0 { - return fmt.Errorf("some addresses failed to add: %v", errors[0]) - } + // 记录缓存统计信息 + total, valid := p.getCacheStats() + p.log.Info("IPv4 addresses processed", + zap.Int("processed_count", len(addresses)), + zap.Int("cache_total", total), + zap.Int("cache_valid", valid)) return nil } @@ -285,14 +337,21 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa p.log.Debug("checking address", zap.String("cidr", cidrAddr), zap.String("list", listName)) - // 检查地址是否已存在 + // 首先检查内存缓存 + if p.isInCache(listName, cidrAddr) { + p.log.Debug("address found in cache, skipping", zap.String("cidr", cidrAddr), zap.String("list", listName)) + return nil + } + + // 缓存中没有,检查 MikroTik 中是否已存在 exists, err := p.addressExists(listName, cidrAddr) if err != nil { // 如果检查失败,可能是地址列表不存在,继续尝试添加 - p.log.Debug("failed to check if address exists, will try to add anyway", zap.Error(err)) + p.log.Debug("failed to check if address exists in MikroTik, will try to add anyway", zap.Error(err)) } else if exists { - // 地址已存在,跳过 - p.log.Debug("address already exists", zap.String("cidr", cidrAddr), zap.String("list", listName)) + // 地址已存在于 MikroTik 中,添加到缓存并跳过 + p.log.Debug("address already exists in MikroTik, adding to cache", zap.String("cidr", cidrAddr), zap.String("list", listName)) + p.addToCache(listName, cidrAddr) return nil } @@ -320,53 +379,64 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa p.log.Debug("Add to list: ", zap.Strings("params", params)) - // 发送到 RouterOS,带重试机制 + // 发送到 RouterOS,带智能重试机制 maxRetries := 3 + backoffDuration := 100 * time.Millisecond + for i := 0; i < maxRetries; i++ { // 使用读锁保护连接访问 p.mu.RLock() conn := p.conn + isConnected := p.isConnected p.mu.RUnlock() - if conn == nil { - p.log.Error("connection is nil") - return fmt.Errorf("connection is nil") + if conn == nil || !isConnected { + p.log.Debug("connection not available, attempting to reconnect") + p.mu.Lock() + p.isConnected = false + if err := p.reconnect(); err != nil { + p.mu.Unlock() + p.log.Error("failed to reconnect", zap.Error(err)) + time.Sleep(backoffDuration) + backoffDuration *= 2 // 指数退避 + continue + } + conn = p.conn + p.mu.Unlock() } args := append([]string{"/ip/firewall/address-list/add"}, params...) _, err = conn.Run(args...) if err != nil { if strings.Contains(err.Error(), "already have such entry") { - p.log.Debug("Already exists: ", zap.String("cidr", cidrAddr)) + p.log.Debug("Address already exists", zap.String("cidr", cidrAddr)) + p.addToCache(listName, cidrAddr) // 添加到缓存 return nil } - // 如果是连接错误,尝试重新连接 - if strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "connection") { - p.log.Warn("connection error, attempting to reconnect", + // 检查是否为连接错误 + if p.isConnectionError(err) { + p.log.Warn("connection error, will retry", zap.String("cidr", cidrAddr), zap.Int("retry", i+1), zap.Error(err)) - // 使用写锁保护重连操作 p.mu.Lock() - if err := p.reconnect(); err != nil { - p.mu.Unlock() - p.log.Error("failed to reconnect", zap.Error(err)) - continue - } + p.isConnected = false p.mu.Unlock() - // 重试 + // 指数退避 + time.Sleep(backoffDuration) + backoffDuration *= 2 continue } - // 其他错误,记录并返回 + // 其他错误,直接返回 p.log.Error("failed to add address to MikroTik", zap.String("cidr", cidrAddr), zap.String("list", listName), zap.Error(err)) - return fmt.Errorf("failed to add address %s to list %s: from RouterOS device: %v", cidrAddr, listName, err) + return fmt.Errorf("failed to add address %s to list %s: %v", cidrAddr, listName, err) } // 成功,跳出重试循环 @@ -377,6 +447,9 @@ func (p *mikrotikAddressListPlugin) addAddressToMikrotik(addr netip.Addr, listNa zap.String("cidr", cidrAddr), zap.String("list", listName)) + // 添加到缓存 + p.addToCache(listName, cidrAddr) + return nil } @@ -410,3 +483,105 @@ func (p *mikrotikAddressListPlugin) addressExists(listName, address string) (boo return exists, nil } + +// batchAddAddresses 批量添加地址到MikroTik(批量操作优化) +func (p *mikrotikAddressListPlugin) batchAddAddresses(addresses []netip.Addr, listName string, mask int) error { + if len(addresses) == 0 { + return nil + } + + // 分批处理,每批10个地址 + batchSize := 10 + var wg sync.WaitGroup + var mu sync.Mutex + var errors []error + successCount := 0 + + for i := 0; i < len(addresses); i += batchSize { + end := i + batchSize + if end > len(addresses) { + end = len(addresses) + } + + batch := addresses[i:end] + wg.Add(1) + + go func(batch []netip.Addr) { + defer wg.Done() + + // 获取工作池槽位 + select { + case p.workerPool <- struct{}{}: + defer func() { <-p.workerPool }() + default: + p.log.Debug("worker pool full, processing batch directly") + } + + for _, addr := range batch { + if err := p.addAddressToMikrotik(addr, listName, mask); err != nil { + mu.Lock() + errors = append(errors, err) + mu.Unlock() + } else { + mu.Lock() + successCount++ + mu.Unlock() + } + } + }(batch) + } + + wg.Wait() + + if len(errors) > 0 { + p.log.Error("batch processing completed with errors", + zap.Int("success_count", successCount), + zap.Int("error_count", len(errors)), + zap.Error(errors[0])) + return errors[0] + } + + p.log.Info("batch processing completed successfully", + zap.Int("success_count", successCount), + zap.Int("total_count", len(addresses))) + + return nil +} + +// adjustWorkerPoolSize 动态调整工作池大小 +func (p *mikrotikAddressListPlugin) adjustWorkerPoolSize(addressCount int) { + var targetSize int + switch { + case addressCount <= 5: + targetSize = 3 + case addressCount <= 20: + targetSize = 5 + case addressCount <= 50: + targetSize = 10 + default: + targetSize = 15 + } + + // 如果当前容量不够,创建新的工作池 + if cap(p.workerPool) < targetSize { + p.log.Debug("adjusting worker pool size", + zap.Int("old_size", cap(p.workerPool)), + zap.Int("new_size", targetSize), + zap.Int("address_count", addressCount)) + + // 创建新的工作池 + p.workerPool = make(chan struct{}, targetSize) + } +} + +// isConnectionError 检查是否为连接错误 +func (p *mikrotikAddressListPlugin) isConnectionError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "EOF") || + strings.Contains(errStr, "connection") || + strings.Contains(errStr, "closed") || + strings.Contains(errStr, "timeout") +}