Files
sub2api/backend/internal/service/subscription_maintenance_queue.go

89 lines
1.8 KiB
Go
Raw Permalink Normal View History

package service
import (
"fmt"
"log"
"sync"
)
// SubscriptionMaintenanceQueue 提供"有界队列 + 固定 worker"的后台执行器。
// 用于从请求热路径触发维护动作时,避免无限 goroutine 膨胀。
type SubscriptionMaintenanceQueue struct {
queue chan func()
wg sync.WaitGroup
stop sync.Once
mu sync.RWMutex // 保护 closed 标志与 channel 操作的原子性
closed bool
}
func NewSubscriptionMaintenanceQueue(workerCount, queueSize int) *SubscriptionMaintenanceQueue {
if workerCount <= 0 {
workerCount = 1
}
if queueSize <= 0 {
queueSize = 1
}
q := &SubscriptionMaintenanceQueue{
queue: make(chan func(), queueSize),
}
q.wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func(workerID int) {
defer q.wg.Done()
for fn := range q.queue {
func() {
defer func() {
if r := recover(); r != nil {
log.Printf("SubscriptionMaintenance worker panic: %v", r)
}
}()
fn()
}()
}
}(i)
}
return q
}
// TryEnqueue 尝试将任务入队。
// 当队列已满时返回 error调用方应该选择跳过并记录告警/限频日志)。
// 当队列已关闭时返回 error不会 panic。
func (q *SubscriptionMaintenanceQueue) TryEnqueue(task func()) error {
if q == nil {
return fmt.Errorf("maintenance queue is nil")
}
if task == nil {
return fmt.Errorf("maintenance task is nil")
}
q.mu.RLock()
defer q.mu.RUnlock()
if q.closed {
return fmt.Errorf("maintenance queue stopped")
}
select {
case q.queue <- task:
return nil
default:
return fmt.Errorf("maintenance queue full")
}
}
func (q *SubscriptionMaintenanceQueue) Stop() {
if q == nil {
return
}
q.stop.Do(func() {
q.mu.Lock()
q.closed = true
close(q.queue)
q.mu.Unlock()
q.wg.Wait()
})
}