Files
sub2api/backend/internal/repository/channel_monitor_repo.go
erio ef6ec8a15a fix(channel-monitor): drop soft delete, refactor feature flag to declarative form
### 后端修复:日志表不该用软删除

channel_monitor_histories / channel_monitor_daily_rollups 都是日志/聚合表,
没有恢复需求。110 里加的 SoftDeleteMixin 会让 DELETE 自动变成 UPDATE deleted_at,
导致行和索引只增不减,徒增磁盘占用和查询成本。

改回分批物理删(参考 OpsCleanupService.deleteOldRowsByID 模板):

- ent schema 移除 SoftDeleteMixin,重新 go generate
- repo 新增 deleteChannelMonitorBatched 辅助 + 两条 prune SQL 常量
  (WITH batch AS SELECT id LIMIT 5000 → DELETE IN batch)
- DeleteHistoryBefore / DeleteRollupsBefore 改调分批 raw SQL
- 移除 ComputeAvailability / ComputeAvailabilityForMonitors / UpsertDailyRollupsFor /
  ListLatestPerModel / ListLatestForMonitorIDs / ListRecentHistoryForMonitors 等
  raw SQL 中的 deleted_at IS NULL 过滤
- UpsertDailyRollupsFor 的 ON CONFLICT 去掉 deleted_at = NULL 重置
- migration 111 DROP COLUMN deleted_at + 对应索引(110 已部署但 maintenance
  首跑在次日 02:00,此时尚无业务数据在依赖软删除)

### 前端重构:feature flag 声明式 + 复用

AppSidebar.vue 里 7 处 `...(flag ? [item] : [])` 样板代码删光,改为 NavItem 加
featureFlag?: () => boolean | undefined 字段,加一个 applyFeatureFlags 递归
过滤(含 children)。语义统一为 `!== false`(宽容策略,undefined 时默认显示,
避免 public settings 未加载完成时菜单闪烁消失 — 对应用户反馈"刷新后菜单消失
要去保存设置才回来")。

- 集中声明 4 个 flag getter:flagChannelMonitor / flagPayment /
  flagOpsMonitoring / flagAdminPayment
- 提取 buildSelfNavItems 复用用户端主菜单和管理员"我的账户"子菜单
- 未来新增开关:在统一位置加一个 flag getter + 给对应 NavItem 加字段
  (不用再动渲染逻辑)

bump 0.1.114.29
2026-04-23 17:31:15 +08:00

743 lines
27 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package repository
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
dbent "github.com/Wei-Shaw/sub2api/ent"
"github.com/Wei-Shaw/sub2api/ent/channelmonitor"
"github.com/Wei-Shaw/sub2api/ent/channelmonitorhistory"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/lib/pq"
)
// channelMonitorRepository 实现 service.ChannelMonitorRepository。
//
// 选型说明:
// - CRUD 走 ent复用项目的事务上下文支持
// - 聚合查询latest per model / availability走原生 SQL避免 ent 在 GROUP BY 上
// 的样板代码,并保证索引能被命中
type channelMonitorRepository struct {
client *dbent.Client
db *sql.DB
}
// NewChannelMonitorRepository 创建仓储实例。
func NewChannelMonitorRepository(client *dbent.Client, db *sql.DB) service.ChannelMonitorRepository {
return &channelMonitorRepository{client: client, db: db}
}
// ---------- CRUD ----------
func (r *channelMonitorRepository) Create(ctx context.Context, m *service.ChannelMonitor) error {
client := clientFromContext(ctx, r.client)
builder := client.ChannelMonitor.Create().
SetName(m.Name).
SetProvider(channelmonitor.Provider(m.Provider)).
SetEndpoint(m.Endpoint).
SetAPIKeyEncrypted(m.APIKey). // 调用方传入的已是密文
SetPrimaryModel(m.PrimaryModel).
SetExtraModels(emptySliceIfNil(m.ExtraModels)).
SetGroupName(m.GroupName).
SetEnabled(m.Enabled).
SetIntervalSeconds(m.IntervalSeconds).
SetCreatedBy(m.CreatedBy)
created, err := builder.Save(ctx)
if err != nil {
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
}
m.ID = created.ID
m.CreatedAt = created.CreatedAt
m.UpdatedAt = created.UpdatedAt
return nil
}
func (r *channelMonitorRepository) GetByID(ctx context.Context, id int64) (*service.ChannelMonitor, error) {
row, err := r.client.ChannelMonitor.Query().
Where(channelmonitor.IDEQ(id)).
Only(ctx)
if err != nil {
return nil, translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
}
return entToServiceMonitor(row), nil
}
func (r *channelMonitorRepository) Update(ctx context.Context, m *service.ChannelMonitor) error {
client := clientFromContext(ctx, r.client)
updater := client.ChannelMonitor.UpdateOneID(m.ID).
SetName(m.Name).
SetProvider(channelmonitor.Provider(m.Provider)).
SetEndpoint(m.Endpoint).
SetAPIKeyEncrypted(m.APIKey).
SetPrimaryModel(m.PrimaryModel).
SetExtraModels(emptySliceIfNil(m.ExtraModels)).
SetGroupName(m.GroupName).
SetEnabled(m.Enabled).
SetIntervalSeconds(m.IntervalSeconds)
updated, err := updater.Save(ctx)
if err != nil {
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
}
m.UpdatedAt = updated.UpdatedAt
return nil
}
func (r *channelMonitorRepository) Delete(ctx context.Context, id int64) error {
client := clientFromContext(ctx, r.client)
if err := client.ChannelMonitor.DeleteOneID(id).Exec(ctx); err != nil {
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
}
return nil
}
func (r *channelMonitorRepository) List(ctx context.Context, params service.ChannelMonitorListParams) ([]*service.ChannelMonitor, int64, error) {
q := r.client.ChannelMonitor.Query()
if params.Provider != "" {
q = q.Where(channelmonitor.ProviderEQ(channelmonitor.Provider(params.Provider)))
}
if params.Enabled != nil {
q = q.Where(channelmonitor.EnabledEQ(*params.Enabled))
}
if s := strings.TrimSpace(params.Search); s != "" {
q = q.Where(channelmonitor.Or(
channelmonitor.NameContainsFold(s),
channelmonitor.GroupNameContainsFold(s),
channelmonitor.PrimaryModelContainsFold(s),
))
}
total, err := q.Count(ctx)
if err != nil {
return nil, 0, fmt.Errorf("count monitors: %w", err)
}
pageSize := params.PageSize
if pageSize <= 0 {
pageSize = 20
}
page := params.Page
if page <= 0 {
page = 1
}
rows, err := q.
Order(dbent.Desc(channelmonitor.FieldID)).
Offset((page - 1) * pageSize).
Limit(pageSize).
All(ctx)
if err != nil {
return nil, 0, fmt.Errorf("list monitors: %w", err)
}
out := make([]*service.ChannelMonitor, 0, len(rows))
for _, row := range rows {
out = append(out, entToServiceMonitor(row))
}
return out, int64(total), nil
}
// ---------- 调度器辅助 ----------
func (r *channelMonitorRepository) ListEnabled(ctx context.Context) ([]*service.ChannelMonitor, error) {
rows, err := r.client.ChannelMonitor.Query().
Where(channelmonitor.EnabledEQ(true)).
All(ctx)
if err != nil {
return nil, fmt.Errorf("list enabled monitors: %w", err)
}
out := make([]*service.ChannelMonitor, 0, len(rows))
for _, row := range rows {
out = append(out, entToServiceMonitor(row))
}
return out, nil
}
func (r *channelMonitorRepository) MarkChecked(ctx context.Context, id int64, checkedAt time.Time) error {
client := clientFromContext(ctx, r.client)
if err := client.ChannelMonitor.UpdateOneID(id).
SetLastCheckedAt(checkedAt).
Exec(ctx); err != nil {
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
}
return nil
}
func (r *channelMonitorRepository) InsertHistoryBatch(ctx context.Context, rows []*service.ChannelMonitorHistoryRow) error {
if len(rows) == 0 {
return nil
}
client := clientFromContext(ctx, r.client)
bulk := make([]*dbent.ChannelMonitorHistoryCreate, 0, len(rows))
for _, row := range rows {
c := client.ChannelMonitorHistory.Create().
SetMonitorID(row.MonitorID).
SetModel(row.Model).
SetStatus(channelmonitorhistory.Status(row.Status)).
SetMessage(row.Message).
SetCheckedAt(row.CheckedAt)
if row.LatencyMs != nil {
c = c.SetLatencyMs(*row.LatencyMs)
}
if row.PingLatencyMs != nil {
c = c.SetPingLatencyMs(*row.PingLatencyMs)
}
bulk = append(bulk, c)
}
if _, err := client.ChannelMonitorHistory.CreateBulk(bulk...).Save(ctx); err != nil {
return fmt.Errorf("insert history bulk: %w", err)
}
return nil
}
// DeleteHistoryBefore 物理删 checked_at < before 的明细,分批 channelMonitorPruneBatchSize 行一批,
// 避免单事务删除过多引起锁/WAL 压力。借助 (checked_at) 索引定位小批 id再按 id 删。
func (r *channelMonitorRepository) DeleteHistoryBefore(ctx context.Context, before time.Time) (int64, error) {
return deleteChannelMonitorBatched(ctx, r.db, channelMonitorPruneHistorySQL, before)
}
// ListHistory 按 checked_at 倒序返回某个监控的最近 N 条历史记录。
// model 为空时不过滤;非空时只返回该模型的记录。
func (r *channelMonitorRepository) ListHistory(ctx context.Context, monitorID int64, model string, limit int) ([]*service.ChannelMonitorHistoryEntry, error) {
q := r.client.ChannelMonitorHistory.Query().
Where(channelmonitorhistory.MonitorIDEQ(monitorID))
if strings.TrimSpace(model) != "" {
q = q.Where(channelmonitorhistory.ModelEQ(model))
}
rows, err := q.
Order(dbent.Desc(channelmonitorhistory.FieldCheckedAt)).
Limit(limit).
All(ctx)
if err != nil {
return nil, fmt.Errorf("list history: %w", err)
}
out := make([]*service.ChannelMonitorHistoryEntry, 0, len(rows))
for _, row := range rows {
entry := &service.ChannelMonitorHistoryEntry{
ID: row.ID,
Model: row.Model,
Status: string(row.Status),
LatencyMs: row.LatencyMs,
PingLatencyMs: row.PingLatencyMs,
Message: row.Message,
CheckedAt: row.CheckedAt,
}
out = append(out, entry)
}
return out, nil
}
// ---------- 用户视图聚合(原生 SQL ----------
// ListLatestPerModel 用 DISTINCT ON 取每个 (monitor_id, model) 的最近一条记录。
// 借助 (monitor_id, model, checked_at DESC) 索引可走 Index Scan。
func (r *channelMonitorRepository) ListLatestPerModel(ctx context.Context, monitorID int64) ([]*service.ChannelMonitorLatest, error) {
const q = `
SELECT DISTINCT ON (model)
model, status, latency_ms, ping_latency_ms, checked_at
FROM channel_monitor_histories
WHERE monitor_id = $1
ORDER BY model, checked_at DESC
`
rows, err := r.db.QueryContext(ctx, q, monitorID)
if err != nil {
return nil, fmt.Errorf("query latest per model: %w", err)
}
defer func() { _ = rows.Close() }()
out := make([]*service.ChannelMonitorLatest, 0)
for rows.Next() {
l := &service.ChannelMonitorLatest{}
var latency, ping sql.NullInt64
if err := rows.Scan(&l.Model, &l.Status, &latency, &ping, &l.CheckedAt); err != nil {
return nil, fmt.Errorf("scan latest row: %w", err)
}
assignNullInt(&l.LatencyMs, latency)
assignNullInt(&l.PingLatencyMs, ping)
out = append(out, l)
}
return out, rows.Err()
}
// assignNullInt 把 sql.NullInt64 解包到 *int 指针目标valid 才分配新 int
// 集中实现避免 latency / ping 两处重复 if latency.Valid { v := int(...) ... } 模板。
func assignNullInt(dst **int, n sql.NullInt64) {
if !n.Valid {
return
}
v := int(n.Int64)
*dst = &v
}
// ComputeAvailability 计算指定窗口内每个模型的可用率与平均延迟。
// "可用" = status IN (operational, degraded)。
//
// 数据来源:明细表只保留 1 天;窗口前其余天数走聚合表。
// - raw = 今天CURRENT_DATE 起)的未软删明细,按 model 累加
// - rollup = [CURRENT_DATE - windowDays, CURRENT_DATE) 区间的聚合行
//
// 总窗口为 "今天 + 过去 windowDays 天",比 windowDays 字面值大 1 天,但因为聚合
// 是按整 UTC 日切的,这是聚合化无法避免的精度损失,且偏宽不偏窄(数据更全)。
func (r *channelMonitorRepository) ComputeAvailability(ctx context.Context, monitorID int64, windowDays int) ([]*service.ChannelMonitorAvailability, error) {
if windowDays <= 0 {
windowDays = 7
}
const q = `
WITH raw AS (
SELECT model,
COUNT(*) AS total_checks,
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
COUNT(latency_ms) AS count_latency
FROM channel_monitor_histories
WHERE monitor_id = $1
AND checked_at >= CURRENT_DATE
GROUP BY model
),
rollup AS (
SELECT model, total_checks, ok_count, sum_latency_ms, count_latency
FROM channel_monitor_daily_rollups
WHERE monitor_id = $1
AND bucket_date >= (CURRENT_DATE - $2::int)
AND bucket_date < CURRENT_DATE
)
SELECT model,
SUM(total_checks) AS total,
SUM(ok_count) AS ok,
CASE WHEN SUM(count_latency) > 0
THEN SUM(sum_latency_ms)::float8 / SUM(count_latency)
ELSE NULL END AS avg_latency_ms
FROM (SELECT * FROM raw UNION ALL SELECT * FROM rollup) combined
GROUP BY model
`
rows, err := r.db.QueryContext(ctx, q, monitorID, windowDays)
if err != nil {
return nil, fmt.Errorf("query availability: %w", err)
}
defer func() { _ = rows.Close() }()
out := make([]*service.ChannelMonitorAvailability, 0)
for rows.Next() {
row, err := scanAvailabilityRow(rows, windowDays)
if err != nil {
return nil, err
}
out = append(out, row)
}
return out, rows.Err()
}
// scanAvailabilityRow 把单行 (model, total, ok, avg_latency) 扫描为 ChannelMonitorAvailability。
// 仅服务于 ComputeAvailability4 列);批量版本因为多一列 monitor_id 直接 inline 调 finalizeAvailabilityRow。
func scanAvailabilityRow(rows interface{ Scan(...any) error }, windowDays int) (*service.ChannelMonitorAvailability, error) {
row := &service.ChannelMonitorAvailability{WindowDays: windowDays}
var avgLatency sql.NullFloat64
if err := rows.Scan(&row.Model, &row.TotalChecks, &row.OperationalChecks, &avgLatency); err != nil {
return nil, fmt.Errorf("scan availability row: %w", err)
}
finalizeAvailabilityRow(row, avgLatency)
return row, nil
}
// finalizeAvailabilityRow 根据 OperationalChecks/TotalChecks 算出可用率,
// 并把 sql.NullFloat64 的平均延迟解包为 *int。两处复用避免维护漂移。
func finalizeAvailabilityRow(row *service.ChannelMonitorAvailability, avgLatency sql.NullFloat64) {
if row.TotalChecks > 0 {
row.AvailabilityPct = float64(row.OperationalChecks) * 100.0 / float64(row.TotalChecks)
}
if avgLatency.Valid {
v := int(avgLatency.Float64)
row.AvgLatencyMs = &v
}
}
// ListLatestForMonitorIDs 一次性查询多个监控的"每个 (monitor_id, model) 最近一条"记录。
// 利用 PG 的 DISTINCT ON 特性,借助 (monitor_id, model, checked_at DESC) 索引可走 Index Scan。
func (r *channelMonitorRepository) ListLatestForMonitorIDs(ctx context.Context, ids []int64) (map[int64][]*service.ChannelMonitorLatest, error) {
out := make(map[int64][]*service.ChannelMonitorLatest, len(ids))
if len(ids) == 0 {
return out, nil
}
const q = `
SELECT DISTINCT ON (monitor_id, model)
monitor_id, model, status, latency_ms, ping_latency_ms, checked_at
FROM channel_monitor_histories
WHERE monitor_id = ANY($1)
ORDER BY monitor_id, model, checked_at DESC
`
rows, err := r.db.QueryContext(ctx, q, pq.Array(ids))
if err != nil {
return nil, fmt.Errorf("query latest batch: %w", err)
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var monitorID int64
l := &service.ChannelMonitorLatest{}
var latency, ping sql.NullInt64
if err := rows.Scan(&monitorID, &l.Model, &l.Status, &latency, &ping, &l.CheckedAt); err != nil {
return nil, fmt.Errorf("scan latest batch row: %w", err)
}
assignNullInt(&l.LatencyMs, latency)
assignNullInt(&l.PingLatencyMs, ping)
out[monitorID] = append(out[monitorID], l)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
// ListRecentHistoryForMonitors 为多个 monitor 批量取各自"指定模型"最近 N 条历史(按 checked_at DESC最新在前
// primaryModels[monitorID] 指定该监控要过滤的模型名monitor 不在 primaryModels 中的记录不返回。
// 通过 CTE + unnest(两个 int8/text 数组) 构造 (monitor_id, model) 白名单,
// 再用 ROW_NUMBER() OVER (PARTITION BY monitor_id) 取各自前 N 条。
//
// 返回值map[monitorID] -> []*ChannelMonitorHistoryEntry不含 message减少网络开销
// 空 ids / 空 primaryModels 返回空 map不报错。
func (r *channelMonitorRepository) ListRecentHistoryForMonitors(
ctx context.Context,
ids []int64,
primaryModels map[int64]string,
perMonitorLimit int,
) (map[int64][]*service.ChannelMonitorHistoryEntry, error) {
out := make(map[int64][]*service.ChannelMonitorHistoryEntry, len(ids))
pairIDs, pairModels := buildMonitorModelPairs(ids, primaryModels)
if len(pairIDs) == 0 {
return out, nil
}
perMonitorLimit = clampTimelineLimit(perMonitorLimit)
const q = `
WITH targets AS (
SELECT unnest($1::bigint[]) AS monitor_id,
unnest($2::text[]) AS model
),
ranked AS (
SELECT h.monitor_id,
h.status,
h.latency_ms,
h.ping_latency_ms,
h.checked_at,
ROW_NUMBER() OVER (PARTITION BY h.monitor_id ORDER BY h.checked_at DESC) AS rn
FROM channel_monitor_histories h
JOIN targets t
ON t.monitor_id = h.monitor_id AND t.model = h.model
)
SELECT monitor_id, status, latency_ms, ping_latency_ms, checked_at
FROM ranked
WHERE rn <= $3
ORDER BY monitor_id, checked_at DESC
`
rows, err := r.db.QueryContext(ctx, q, pq.Array(pairIDs), pq.Array(pairModels), perMonitorLimit)
if err != nil {
return nil, fmt.Errorf("query recent history batch: %w", err)
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var monitorID int64
entry := &service.ChannelMonitorHistoryEntry{}
var latency, ping sql.NullInt64
if err := rows.Scan(&monitorID, &entry.Status, &latency, &ping, &entry.CheckedAt); err != nil {
return nil, fmt.Errorf("scan recent history row: %w", err)
}
assignNullInt(&entry.LatencyMs, latency)
assignNullInt(&entry.PingLatencyMs, ping)
out[monitorID] = append(out[monitorID], entry)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
// buildMonitorModelPairs 基于 ids 过滤出有效的 (monitor_id, model) 对model 为空时跳过。
// 保证两个数组长度一致且一一对应,供 unnest 展开。
func buildMonitorModelPairs(ids []int64, primaryModels map[int64]string) ([]int64, []string) {
if len(ids) == 0 || len(primaryModels) == 0 {
return nil, nil
}
pairIDs := make([]int64, 0, len(ids))
pairModels := make([]string, 0, len(ids))
for _, id := range ids {
model, ok := primaryModels[id]
if !ok || strings.TrimSpace(model) == "" {
continue
}
pairIDs = append(pairIDs, id)
pairModels = append(pairModels, model)
}
return pairIDs, pairModels
}
// timelineLimit* 批量 timeline 查询的 perMonitorLimit 夹紧范围。
// 下限 1 表示至少返回最近一条;上限 200 控制单次响应体与 SQL 内存占用ROW_NUMBER 窗口上限)。
const (
timelineLimitMin = 1
timelineLimitMax = 200
)
// clampTimelineLimit 把 perMonitorLimit 夹紧到 [timelineLimitMin, timelineLimitMax],避免非法值或超大查询。
func clampTimelineLimit(n int) int {
if n < timelineLimitMin {
return timelineLimitMin
}
if n > timelineLimitMax {
return timelineLimitMax
}
return n
}
// ComputeAvailabilityForMonitors 一次性计算多个监控在某个窗口内的每模型可用率与平均延迟。
// 与单 monitor 版本同构:明细只覆盖今天,更早走聚合表 UNION 合并。
func (r *channelMonitorRepository) ComputeAvailabilityForMonitors(ctx context.Context, ids []int64, windowDays int) (map[int64][]*service.ChannelMonitorAvailability, error) {
out := make(map[int64][]*service.ChannelMonitorAvailability, len(ids))
if len(ids) == 0 {
return out, nil
}
if windowDays <= 0 {
windowDays = 7
}
const q = `
WITH raw AS (
SELECT monitor_id,
model,
COUNT(*) AS total_checks,
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
COUNT(latency_ms) AS count_latency
FROM channel_monitor_histories
WHERE monitor_id = ANY($1)
AND checked_at >= CURRENT_DATE
GROUP BY monitor_id, model
),
rollup AS (
SELECT monitor_id, model, total_checks, ok_count, sum_latency_ms, count_latency
FROM channel_monitor_daily_rollups
WHERE monitor_id = ANY($1)
AND bucket_date >= (CURRENT_DATE - $2::int)
AND bucket_date < CURRENT_DATE
)
SELECT monitor_id,
model,
SUM(total_checks) AS total,
SUM(ok_count) AS ok,
CASE WHEN SUM(count_latency) > 0
THEN SUM(sum_latency_ms)::float8 / SUM(count_latency)
ELSE NULL END AS avg_latency_ms
FROM (SELECT * FROM raw UNION ALL SELECT * FROM rollup) combined
GROUP BY monitor_id, model
`
rows, err := r.db.QueryContext(ctx, q, pq.Array(ids), windowDays)
if err != nil {
return nil, fmt.Errorf("query availability batch: %w", err)
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var monitorID int64
row := &service.ChannelMonitorAvailability{WindowDays: windowDays}
var avgLatency sql.NullFloat64
if err := rows.Scan(&monitorID, &row.Model, &row.TotalChecks, &row.OperationalChecks, &avgLatency); err != nil {
return nil, fmt.Errorf("scan availability batch row: %w", err)
}
// 批量查询多了首列 monitor_id其余字段的可用率/平均延迟换算与单 monitor 版本一致,
// 抽出 finalizeAvailabilityRow 复用,避免两处分别维护除法与 NullFloat 解包。
finalizeAvailabilityRow(row, avgLatency)
out[monitorID] = append(out[monitorID], row)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
// ---------- 聚合维护 ----------
// UpsertDailyRollupsFor 把 targetDate 当天([targetDate, targetDate+1d))的明细
// 按 (monitor_id, model, bucket_date) 聚合写入 channel_monitor_daily_rollups。
// - 用 ON CONFLICT (monitor_id, model, bucket_date) DO UPDATE 实现幂等回填,
// 重复执行只会用最新统计覆盖;
// - $1::date 让 PG 自动把入参 truncate 到 UTC 日期,调用方不需要预处理 targetDate。
func (r *channelMonitorRepository) UpsertDailyRollupsFor(ctx context.Context, targetDate time.Time) (int64, error) {
const q = `
INSERT INTO channel_monitor_daily_rollups (
monitor_id, model, bucket_date,
total_checks, ok_count,
operational_count, degraded_count, failed_count, error_count,
sum_latency_ms, count_latency,
sum_ping_latency_ms, count_ping_latency,
computed_at
)
SELECT
monitor_id,
model,
$1::date AS bucket_date,
COUNT(*) AS total_checks,
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
COUNT(*) FILTER (WHERE status = 'operational') AS operational_count,
COUNT(*) FILTER (WHERE status = 'degraded') AS degraded_count,
COUNT(*) FILTER (WHERE status = 'failed') AS failed_count,
COUNT(*) FILTER (WHERE status = 'error') AS error_count,
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
COUNT(latency_ms) AS count_latency,
COALESCE(SUM(ping_latency_ms) FILTER (WHERE ping_latency_ms IS NOT NULL), 0) AS sum_ping_latency_ms,
COUNT(ping_latency_ms) AS count_ping_latency,
NOW()
FROM channel_monitor_histories
WHERE checked_at >= $1::date
AND checked_at < ($1::date + INTERVAL '1 day')
GROUP BY monitor_id, model
ON CONFLICT (monitor_id, model, bucket_date) DO UPDATE SET
total_checks = EXCLUDED.total_checks,
ok_count = EXCLUDED.ok_count,
operational_count = EXCLUDED.operational_count,
degraded_count = EXCLUDED.degraded_count,
failed_count = EXCLUDED.failed_count,
error_count = EXCLUDED.error_count,
sum_latency_ms = EXCLUDED.sum_latency_ms,
count_latency = EXCLUDED.count_latency,
sum_ping_latency_ms = EXCLUDED.sum_ping_latency_ms,
count_ping_latency = EXCLUDED.count_ping_latency,
computed_at = NOW()
`
res, err := r.db.ExecContext(ctx, q, targetDate)
if err != nil {
return 0, fmt.Errorf("upsert daily rollups for %s: %w", targetDate.Format("2006-01-02"), err)
}
n, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("rows affected (upsert rollups): %w", err)
}
return n, nil
}
// DeleteRollupsBefore 物理删 bucket_date < beforeDate 的聚合行,同样分批。
func (r *channelMonitorRepository) DeleteRollupsBefore(ctx context.Context, beforeDate time.Time) (int64, error) {
return deleteChannelMonitorBatched(ctx, r.db, channelMonitorPruneRollupSQL, beforeDate)
}
// channelMonitorPruneBatchSize 单批删除上限。与 ops_cleanup_service 保持一致的 5000
// 在大表上按 id 小批删可以避免长事务和 WAL 堆积。
const channelMonitorPruneBatchSize = 5000
// channelMonitorPruneHistorySQL 分批物理删明细表过期行。
const channelMonitorPruneHistorySQL = `
WITH batch AS (
SELECT id FROM channel_monitor_histories
WHERE checked_at < $1
ORDER BY id
LIMIT $2
)
DELETE FROM channel_monitor_histories
WHERE id IN (SELECT id FROM batch)
`
// channelMonitorPruneRollupSQL 分批物理删 rollup 表过期行。bucket_date 需要 ::date 转型
// 保证与 DATE 列一致比较。
const channelMonitorPruneRollupSQL = `
WITH batch AS (
SELECT id FROM channel_monitor_daily_rollups
WHERE bucket_date < $1::date
ORDER BY id
LIMIT $2
)
DELETE FROM channel_monitor_daily_rollups
WHERE id IN (SELECT id FROM batch)
`
// deleteChannelMonitorBatched 循环执行分批 DELETE直到影响行为 0。返回累计删除行数。
// cutoff 由调用方按列类型传入(明细用 time.Time 对 TIMESTAMPTZrollup 用 time.Time SQL 侧 ::date 转型)。
func deleteChannelMonitorBatched(ctx context.Context, db *sql.DB, query string, cutoff time.Time) (int64, error) {
var total int64
for {
res, err := db.ExecContext(ctx, query, cutoff, channelMonitorPruneBatchSize)
if err != nil {
return total, fmt.Errorf("channel_monitor prune batch: %w", err)
}
affected, err := res.RowsAffected()
if err != nil {
return total, fmt.Errorf("channel_monitor prune rows affected: %w", err)
}
total += affected
if affected == 0 {
break
}
}
return total, nil
}
// LoadAggregationWatermark 读 watermark 表id=1
// watermark 表不是 ent schema只有一行直接走原生 SQL。
// - 行不存在或 last_aggregated_date IS NULL返回 (nil, nil),由调用方决定首次回填策略
func (r *channelMonitorRepository) LoadAggregationWatermark(ctx context.Context) (*time.Time, error) {
const q = `SELECT last_aggregated_date FROM channel_monitor_aggregation_watermark WHERE id = 1`
var t sql.NullTime
if err := r.db.QueryRowContext(ctx, q).Scan(&t); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("load aggregation watermark: %w", err)
}
if !t.Valid {
return nil, nil
}
return &t.Time, nil
}
// UpdateAggregationWatermark 更新 watermarkUPSERT 到 id=1
// $1::date 让 PG 把入参 truncate 到 UTC 日期,与 last_aggregated_date 列的 DATE 类型一致。
func (r *channelMonitorRepository) UpdateAggregationWatermark(ctx context.Context, date time.Time) error {
const q = `
INSERT INTO channel_monitor_aggregation_watermark (id, last_aggregated_date, updated_at)
VALUES (1, $1::date, NOW())
ON CONFLICT (id) DO UPDATE SET
last_aggregated_date = EXCLUDED.last_aggregated_date,
updated_at = NOW()
`
if _, err := r.db.ExecContext(ctx, q, date); err != nil {
return fmt.Errorf("update aggregation watermark: %w", err)
}
return nil
}
// ---------- helpers ----------
func entToServiceMonitor(row *dbent.ChannelMonitor) *service.ChannelMonitor {
if row == nil {
return nil
}
extras := row.ExtraModels
if extras == nil {
extras = []string{}
}
return &service.ChannelMonitor{
ID: row.ID,
Name: row.Name,
Provider: string(row.Provider),
Endpoint: row.Endpoint,
APIKey: row.APIKeyEncrypted, // 仍为密文service 层负责解密
PrimaryModel: row.PrimaryModel,
ExtraModels: extras,
GroupName: row.GroupName,
Enabled: row.Enabled,
IntervalSeconds: row.IntervalSeconds,
LastCheckedAt: row.LastCheckedAt,
CreatedBy: row.CreatedBy,
CreatedAt: row.CreatedAt,
UpdatedAt: row.UpdatedAt,
}
}
func emptySliceIfNil(in []string) []string {
if in == nil {
return []string{}
}
return in
}