2026-02-10 00:37:47 +08:00
|
|
|
|
package service
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"log"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2026-02-10 17:51:49 +08:00
|
|
|
|
// SubscriptionMaintenanceQueue 提供"有界队列 + 固定 worker"的后台执行器。
|
2026-02-10 00:37:47 +08:00
|
|
|
|
// 用于从请求热路径触发维护动作时,避免无限 goroutine 膨胀。
|
|
|
|
|
|
type SubscriptionMaintenanceQueue struct {
|
2026-02-10 17:51:49 +08:00
|
|
|
|
queue chan func()
|
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
|
stop sync.Once
|
|
|
|
|
|
mu sync.RWMutex // 保护 closed 标志与 channel 操作的原子性
|
|
|
|
|
|
closed bool
|
2026-02-10 00:37:47 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewSubscriptionMaintenanceQueue(workerCount, queueSize int) *SubscriptionMaintenanceQueue {
|
|
|
|
|
|
if workerCount <= 0 {
|
|
|
|
|
|
workerCount = 1
|
|
|
|
|
|
}
|
|
|
|
|
|
if queueSize <= 0 {
|
|
|
|
|
|
queueSize = 1
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
q := &SubscriptionMaintenanceQueue{
|
|
|
|
|
|
queue: make(chan func(), queueSize),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
q.wg.Add(workerCount)
|
|
|
|
|
|
for i := 0; i < workerCount; i++ {
|
|
|
|
|
|
go func(workerID int) {
|
|
|
|
|
|
defer q.wg.Done()
|
|
|
|
|
|
for fn := range q.queue {
|
|
|
|
|
|
func() {
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
|
log.Printf("SubscriptionMaintenance worker panic: %v", r)
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
fn()
|
|
|
|
|
|
}()
|
|
|
|
|
|
}
|
|
|
|
|
|
}(i)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return q
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// TryEnqueue 尝试将任务入队。
|
|
|
|
|
|
// 当队列已满时返回 error(调用方应该选择跳过并记录告警/限频日志)。
|
2026-02-10 17:51:49 +08:00
|
|
|
|
// 当队列已关闭时返回 error,不会 panic。
|
2026-02-10 00:37:47 +08:00
|
|
|
|
func (q *SubscriptionMaintenanceQueue) TryEnqueue(task func()) error {
|
|
|
|
|
|
if q == nil {
|
|
|
|
|
|
return fmt.Errorf("maintenance queue is nil")
|
|
|
|
|
|
}
|
|
|
|
|
|
if task == nil {
|
|
|
|
|
|
return fmt.Errorf("maintenance task is nil")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-10 17:51:49 +08:00
|
|
|
|
q.mu.RLock()
|
|
|
|
|
|
defer q.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
if q.closed {
|
|
|
|
|
|
return fmt.Errorf("maintenance queue stopped")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-02-10 00:37:47 +08:00
|
|
|
|
select {
|
|
|
|
|
|
case q.queue <- task:
|
|
|
|
|
|
return nil
|
|
|
|
|
|
default:
|
|
|
|
|
|
return fmt.Errorf("maintenance queue full")
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (q *SubscriptionMaintenanceQueue) Stop() {
|
|
|
|
|
|
if q == nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
q.stop.Do(func() {
|
2026-02-10 17:51:49 +08:00
|
|
|
|
q.mu.Lock()
|
|
|
|
|
|
q.closed = true
|
2026-02-10 00:37:47 +08:00
|
|
|
|
close(q.queue)
|
2026-02-10 17:51:49 +08:00
|
|
|
|
q.mu.Unlock()
|
2026-02-10 00:37:47 +08:00
|
|
|
|
q.wg.Wait()
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|