Files
sub2api/backend/internal/service/ops_cleanup_service.go
erio 4b6954f9f0 feat(ops): allow retention days = 0 to wipe table on each scheduled cleanup
Background / 背景

The ops cleanup task currently rejects retention days < 1 in both validate
and normalize, so operators who want minimal-history setups (e.g. high
churn deployments that prefer near-realtime cleanup) cannot express that
intent through the UI. The only options are 1+ days, which keeps at least
24h of history regardless of cron frequency.

ops 清理任务目前在 validate 和 normalize 两处都拒绝小于 1 的保留天数,
让希望尽量不留历史的运维场景(高吞吐部署 + 想用近实时清理)无法通过 UI
表达。最低只能配 1,等于不管 cron 多频繁,至少都会保留 24 小时的历史。

Purpose / 目的

Let admins set retention days to 0, meaning "every scheduled cleanup
run wipes the corresponding table(s) entirely". Combined with a more
frequent cron (e.g. `0 * * * *`) this yields effectively rolling cleanup.

允许管理员把保留天数设为 0,语义为"每次定时清理时把对应表全部清空"。
搭配更频繁的 cron(比如每小时整点)即可获得近似滚动清理的效果。

Changes / 改动内容

Backend

- service/ops_settings.go: validate accepts [0, 365]; normalize only
  refills default 30 when value is < 0 (negative is treated as legacy
  bad data, 0 is honoured)
- service/ops_cleanup_service.go: introduce `opsCleanupPlan(now, days)`
  returning `(cutoff, truncate, ok)`. days==0 returns truncate=true and
  short-circuits to a new `truncateOpsTable` helper that uses
  `TRUNCATE TABLE` (O(1), no WAL, no VACUUM pressure). days>0 keeps
  the existing batched DELETE path unchanged. Empty tables skip
  TRUNCATE to avoid the ACCESS EXCLUSIVE lock entirely
- Extract `isMissingRelationError` helper to dedupe the "table not
  yet created" tolerance shared by both delete and truncate paths
- Add unit tests for `opsCleanupPlan` (three branches) and
  `isMissingRelationError`

后端

- service/ops_settings.go: validate 接受 [0, 365];normalize 仅在 < 0
  时回填默认 30(负数视为脏数据,0 被尊重)
- service/ops_cleanup_service.go: 抽 `opsCleanupPlan(now, days)` 返回
  `(cutoff, truncate, ok)`。days==0 → truncate=true,走新增
  `truncateOpsTable`(TRUNCATE TABLE,O(1),无 WAL、无 VACUUM 压力);
  days>0 仍走原批量 DELETE 路径,行为完全不变。空表跳过 TRUNCATE,
  避免无意义的 ACCESS EXCLUSIVE 锁
- 抽 `isMissingRelationError` helper 复用 delete / truncate 两处的
  "表不存在"宽容判断
- 补 `opsCleanupPlan` 三分支 + `isMissingRelationError` 单元测试

Frontend

- OpsSettingsDialog.vue: validation accepts [0, 365]; input min=0
- i18n (zh/en): hint mentions "0 = wipe all on every cleanup",
  validation message updated to 0-365 range

前端

- OpsSettingsDialog.vue: 校验放宽到 [0, 365],input min 改 0
- i18n(zh/en):hint 补"0 = 每次清理时清空所有",错误提示改 0-365

Trade-offs / 取舍

- TRUNCATE requires ACCESS EXCLUSIVE lock briefly, but ops tables only
  have the cleanup task as a writer, so the lock is invisible to other
  workloads
- Empty-table guard avoids the lock when there is nothing to clean
- Negative values are still treated as legacy bad data and replaced
  with default 30 to preserve compatibility
2026-04-29 15:01:02 +08:00

464 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
const (
opsCleanupJobName = "ops_cleanup"
opsCleanupLeaderLockKeyDefault = "ops:cleanup:leader"
opsCleanupLeaderLockTTLDefault = 30 * time.Minute
)
var opsCleanupCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
var opsCleanupReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
// OpsCleanupService periodically deletes old ops data to prevent unbounded DB growth.
//
// - Scheduling: 5-field cron spec (minute hour dom month dow).
// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup.
// - Safety: deletes in batches to avoid long transactions.
//
// 附带:在 runCleanupOnce 末尾调用 ChannelMonitorService.RunDailyMaintenance
// 统一共享 cron schedule + leader lock + heartbeat避免再引一套调度。
type OpsCleanupService struct {
opsRepo OpsRepository
db *sql.DB
redisClient *redis.Client
cfg *config.Config
channelMonitorSvc *ChannelMonitorService
instanceID string
cron *cron.Cron
startOnce sync.Once
stopOnce sync.Once
warnNoRedisOnce sync.Once
}
func NewOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
channelMonitorSvc *ChannelMonitorService,
) *OpsCleanupService {
return &OpsCleanupService{
opsRepo: opsRepo,
db: db,
redisClient: redisClient,
cfg: cfg,
channelMonitorSvc: channelMonitorSvc,
instanceID: uuid.NewString(),
}
}
func (s *OpsCleanupService) Start() {
if s == nil {
return
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return
}
if s.cfg != nil && !s.cfg.Ops.Cleanup.Enabled {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (disabled)")
return
}
if s.opsRepo == nil || s.db == nil {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (missing deps)")
return
}
s.startOnce.Do(func() {
schedule := "0 2 * * *"
if s.cfg != nil && strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) != "" {
schedule = strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule)
}
loc := time.Local
if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" {
if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil {
loc = parsed
}
}
c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc))
_, err := c.AddFunc(schedule, func() { s.runScheduled() })
if err != nil {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (invalid schedule=%q): %v", schedule, err)
return
}
s.cron = c
s.cron.Start()
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] started (schedule=%q tz=%s)", schedule, loc.String())
})
}
func (s *OpsCleanupService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.cron != nil {
ctx := s.cron.Stop()
select {
case <-ctx.Done():
case <-time.After(3 * time.Second):
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out")
}
}
})
}
func (s *OpsCleanupService) runScheduled() {
if s == nil || s.db == nil || s.opsRepo == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
release, ok := s.tryAcquireLeaderLock(ctx)
if !ok {
return
}
if release != nil {
defer release()
}
startedAt := time.Now().UTC()
runAt := startedAt
counts, err := s.runCleanupOnce(ctx)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup failed: %v", err)
return
}
s.recordHeartbeatSuccess(runAt, time.Since(startedAt), counts)
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup complete: %s", counts)
}
type opsCleanupDeletedCounts struct {
errorLogs int64
retryAttempts int64
alertEvents int64
systemLogs int64
logAudits int64
systemMetrics int64
hourlyPreagg int64
dailyPreagg int64
}
func (c opsCleanupDeletedCounts) String() string {
return fmt.Sprintf(
"error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
c.errorLogs,
c.retryAttempts,
c.alertEvents,
c.systemLogs,
c.logAudits,
c.systemMetrics,
c.hourlyPreagg,
c.dailyPreagg,
)
}
// opsCleanupPlan 把"保留天数"翻译成具体的清理动作。
// - days < 0 → 跳过该项清理ok=false保留兼容老数据
// - days == 0 → TRUNCATE TABLEO(1) 全清truncate=true
// - days > 0 → 批量 DELETE 早于 now-N天 的行cutoff = now - N 天
//
// 之所以 days==0 走 TRUNCATE 而非"now+24h cutoff + DELETE"
// - 速度从 O(N) 降到 O(1),对百万行级表毫秒完成
// - 无 WAL 写入、无后续 VACUUM 压力
// - 这些 ops 表只有 cleanup 任务自己写TRUNCATE 的 ACCESS EXCLUSIVE 锁影响可忽略
func opsCleanupPlan(now time.Time, days int) (cutoff time.Time, truncate, ok bool) {
if days < 0 {
return time.Time{}, false, false
}
if days == 0 {
return time.Time{}, true, true
}
return now.AddDate(0, 0, -days), false, true
}
func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) {
out := opsCleanupDeletedCounts{}
if s == nil || s.db == nil || s.cfg == nil {
return out, nil
}
batchSize := 5000
now := time.Now().UTC()
// runOne 把"truncate? cutoff? batched delete?"封装到一处,
// 让三组清理(错误日志类 / 分钟指标 / 小时+日预聚合)调用方只关心表名和列名。
runOne := func(truncate bool, cutoff time.Time, table, timeCol string, castDate bool) (int64, error) {
if truncate {
return truncateOpsTable(ctx, s.db, table)
}
return deleteOldRowsByID(ctx, s.db, table, timeCol, cutoff, batchSize, castDate)
}
// Error-like tables: error logs / retry attempts / alert events / system logs / cleanup audits.
if cutoff, truncate, ok := opsCleanupPlan(now, s.cfg.Ops.Cleanup.ErrorLogRetentionDays); ok {
n, err := runOne(truncate, cutoff, "ops_error_logs", "created_at", false)
if err != nil {
return out, err
}
out.errorLogs = n
n, err = runOne(truncate, cutoff, "ops_retry_attempts", "created_at", false)
if err != nil {
return out, err
}
out.retryAttempts = n
n, err = runOne(truncate, cutoff, "ops_alert_events", "created_at", false)
if err != nil {
return out, err
}
out.alertEvents = n
n, err = runOne(truncate, cutoff, "ops_system_logs", "created_at", false)
if err != nil {
return out, err
}
out.systemLogs = n
n, err = runOne(truncate, cutoff, "ops_system_log_cleanup_audits", "created_at", false)
if err != nil {
return out, err
}
out.logAudits = n
}
// Minute-level metrics snapshots.
if cutoff, truncate, ok := opsCleanupPlan(now, s.cfg.Ops.Cleanup.MinuteMetricsRetentionDays); ok {
n, err := runOne(truncate, cutoff, "ops_system_metrics", "created_at", false)
if err != nil {
return out, err
}
out.systemMetrics = n
}
// Pre-aggregation tables (hourly/daily).
if cutoff, truncate, ok := opsCleanupPlan(now, s.cfg.Ops.Cleanup.HourlyMetricsRetentionDays); ok {
n, err := runOne(truncate, cutoff, "ops_metrics_hourly", "bucket_start", false)
if err != nil {
return out, err
}
out.hourlyPreagg = n
n, err = runOne(truncate, cutoff, "ops_metrics_daily", "bucket_date", true)
if err != nil {
return out, err
}
out.dailyPreagg = n
}
// Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。
// 失败只记日志,不影响 ops 清理的成功状态(与 ops 各步骤风格一致);
// 维护本身已经把每步错误打到 slogheartbeat result 不再分项记录。
if s.channelMonitorSvc != nil {
if err := s.channelMonitorSvc.RunDailyMaintenance(ctx); err != nil {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] channel monitor maintenance failed: %v", err)
}
}
return out, nil
}
func deleteOldRowsByID(
ctx context.Context,
db *sql.DB,
table string,
timeColumn string,
cutoff time.Time,
batchSize int,
castCutoffToDate bool,
) (int64, error) {
if db == nil {
return 0, nil
}
if batchSize <= 0 {
batchSize = 5000
}
where := fmt.Sprintf("%s < $1", timeColumn)
if castCutoffToDate {
where = fmt.Sprintf("%s < $1::date", timeColumn)
}
q := fmt.Sprintf(`
WITH batch AS (
SELECT id FROM %s
WHERE %s
ORDER BY id
LIMIT $2
)
DELETE FROM %s
WHERE id IN (SELECT id FROM batch)
`, table, where, table)
var total int64
for {
res, err := db.ExecContext(ctx, q, cutoff, batchSize)
if err != nil {
// If ops tables aren't present yet (partial deployments), treat as no-op.
if isMissingRelationError(err) {
return total, nil
}
return total, err
}
affected, err := res.RowsAffected()
if err != nil {
return total, err
}
total += affected
if affected == 0 {
break
}
}
return total, nil
}
// truncateOpsTable 用 TRUNCATE TABLE 清空指定表,先 SELECT COUNT(*) 取得清空前行数用于 heartbeat。
//
// 与 deleteOldRowsByID 的差异:
// - 不可指定 WHERE 条件,仅用于 days==0 的"清空全部"语义
// - O(1) 释放表的物理存储页,毫秒级完成,无 WAL 写入、无 VACUUM 压力
// - 需要 ACCESS EXCLUSIVE 锁,但 ops 表只有清理任务自己写入,瞬间锁影响可忽略
//
// 表不存在(部分部署)静默返回 0与 deleteOldRowsByID 保持一致。
func truncateOpsTable(ctx context.Context, db *sql.DB, table string) (int64, error) {
if db == nil {
return 0, nil
}
var count int64
if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count); err != nil {
if isMissingRelationError(err) {
return 0, nil
}
return 0, fmt.Errorf("count %s: %w", table, err)
}
if count == 0 {
return 0, nil
}
if _, err := db.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", table)); err != nil {
if isMissingRelationError(err) {
return 0, nil
}
return 0, fmt.Errorf("truncate %s: %w", table, err)
}
return count, nil
}
// isMissingRelationError 判断 PG 报错是否为"表不存在",用于让清理任务在部分部署场景静默跳过。
func isMissingRelationError(err error) bool {
if err == nil {
return false
}
s := strings.ToLower(err.Error())
return strings.Contains(s, "does not exist") && strings.Contains(s, "relation")
}
func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
if s == nil {
return nil, false
}
// In simple run mode, assume single instance.
if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple {
return nil, true
}
key := opsCleanupLeaderLockKeyDefault
ttl := opsCleanupLeaderLockTTLDefault
// Prefer Redis leader lock when available, but avoid stampeding the DB when Redis is flaky by
// falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
return nil, false
}
return func() {
_, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
// Redis error: fall back to DB advisory lock.
s.warnNoRedisOnce.Do(func() {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] leader lock SetNX failed; falling back to DB advisory lock: %v", err)
})
} else {
s.warnNoRedisOnce.Do(func() {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] redis not configured; using DB advisory lock")
})
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
return nil, false
}
return release, true
}
func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, counts opsCleanupDeletedCounts) {
if s == nil || s.opsRepo == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
result := truncateString(counts.String(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastSuccessAt: &now,
LastDurationMs: &durMs,
LastResult: &result,
})
}
func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
if s == nil || s.opsRepo == nil || err == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
msg := truncateString(err.Error(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastErrorAt: &now,
LastError: &msg,
LastDurationMs: &durMs,
})
}