mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-05-04 21:20:51 +08:00
Problem:
Upstream channels can reject monitor probes based on client fingerprint
(e.g. "only Claude Code clients allowed"). The monitor had no way to
customize the outgoing request to bypass such restrictions.
Solution:
Introduce reusable request templates that carry extra_headers plus an
optional body override; monitors reference a template and receive a
snapshot copy on apply. Template edits do NOT auto-propagate — users
must click "apply to associated monitors" to refresh snapshots, so a
bad template edit cannot instantly break all production monitors.
Data model (migration 112):
- channel_monitor_request_templates: id, name, provider, description,
extra_headers jsonb, body_override_mode ('off'|'merge'|'replace'),
body_override jsonb. Unique (provider, name).
- channel_monitors: +template_id (FK, ON DELETE SET NULL), +extra_headers,
+body_override_mode, +body_override (the three runtime snapshot fields).
Checker (channel_monitor_checker.go):
- callProvider + runCheckForModel accept a CheckOptions carrying the
snapshot fields. mergeHeaders applies user headers on top of adapter
defaults (forbidden list: Host / Content-Length / Transfer-Encoding /
Connection / Content-Encoding).
- buildRequestBody:
off -> adapter default body
merge -> shallow-merge over default; per-provider deny list
(model/messages/contents) protects the challenge contract
replace -> user body verbatim
- Replace mode skips challenge validation; instead HTTP 2xx + non-empty
extracted response text = operational, empty = failed.
- 4 new unit tests cover all three modes + replace/empty-response case.
Admin API:
- /admin/channel-monitor-templates CRUD + /:id/apply (overwrite snapshot
on all template_id=id monitors, returns affected count).
- channel_monitor request/response DTOs gain the 4 new fields.
Frontend:
- channelMonitorTemplate.ts API client.
- MonitorAdvancedRequestConfig.vue shared component for headers textarea
+ body mode radio + body JSON editor; used by both template and monitor
forms.
- MonitorTemplateManagerDialog.vue: provider tabs, list/create/edit/
delete/apply, live "associated monitors" count per row.
- MonitorFiltersBar: new 模板管理 button next to 新增监控.
- MonitorFormDialog: collapsible 高级 section with template dropdown
(filtered by form.provider, clears on provider change) + embedded
AdvancedRequestConfig. Picking a template copies its fields into the
form (snapshot semantics mirrored on the client).
- i18n zh/en entries for all new copy.
chore: bump version to 0.1.114.32
792 lines
28 KiB
Go
792 lines
28 KiB
Go
package repository
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
|
||
dbent "github.com/Wei-Shaw/sub2api/ent"
|
||
"github.com/Wei-Shaw/sub2api/ent/channelmonitor"
|
||
"github.com/Wei-Shaw/sub2api/ent/channelmonitorhistory"
|
||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||
"github.com/lib/pq"
|
||
)
|
||
|
||
// channelMonitorRepository 实现 service.ChannelMonitorRepository。
|
||
//
|
||
// 选型说明:
|
||
// - CRUD 走 ent,复用项目的事务上下文支持
|
||
// - 聚合查询(latest per model / availability)走原生 SQL,避免 ent 在 GROUP BY 上
|
||
// 的样板代码,并保证索引能被命中
|
||
type channelMonitorRepository struct {
|
||
client *dbent.Client
|
||
db *sql.DB
|
||
}
|
||
|
||
// NewChannelMonitorRepository 创建仓储实例。
|
||
func NewChannelMonitorRepository(client *dbent.Client, db *sql.DB) service.ChannelMonitorRepository {
|
||
return &channelMonitorRepository{client: client, db: db}
|
||
}
|
||
|
||
// ---------- CRUD ----------
|
||
|
||
func (r *channelMonitorRepository) Create(ctx context.Context, m *service.ChannelMonitor) error {
|
||
client := clientFromContext(ctx, r.client)
|
||
builder := client.ChannelMonitor.Create().
|
||
SetName(m.Name).
|
||
SetProvider(channelmonitor.Provider(m.Provider)).
|
||
SetEndpoint(m.Endpoint).
|
||
SetAPIKeyEncrypted(m.APIKey). // 调用方传入的已是密文
|
||
SetPrimaryModel(m.PrimaryModel).
|
||
SetExtraModels(emptySliceIfNil(m.ExtraModels)).
|
||
SetGroupName(m.GroupName).
|
||
SetEnabled(m.Enabled).
|
||
SetIntervalSeconds(m.IntervalSeconds).
|
||
SetCreatedBy(m.CreatedBy).
|
||
SetExtraHeaders(emptyHeadersIfNilRepo(m.ExtraHeaders)).
|
||
SetBodyOverrideMode(defaultBodyModeRepo(m.BodyOverrideMode))
|
||
if m.TemplateID != nil {
|
||
builder = builder.SetTemplateID(*m.TemplateID)
|
||
}
|
||
if m.BodyOverride != nil {
|
||
builder = builder.SetBodyOverride(m.BodyOverride)
|
||
}
|
||
|
||
created, err := builder.Save(ctx)
|
||
if err != nil {
|
||
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
|
||
}
|
||
m.ID = created.ID
|
||
m.CreatedAt = created.CreatedAt
|
||
m.UpdatedAt = created.UpdatedAt
|
||
return nil
|
||
}
|
||
|
||
func (r *channelMonitorRepository) GetByID(ctx context.Context, id int64) (*service.ChannelMonitor, error) {
|
||
row, err := r.client.ChannelMonitor.Query().
|
||
Where(channelmonitor.IDEQ(id)).
|
||
Only(ctx)
|
||
if err != nil {
|
||
return nil, translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
|
||
}
|
||
return entToServiceMonitor(row), nil
|
||
}
|
||
|
||
func (r *channelMonitorRepository) Update(ctx context.Context, m *service.ChannelMonitor) error {
|
||
client := clientFromContext(ctx, r.client)
|
||
updater := client.ChannelMonitor.UpdateOneID(m.ID).
|
||
SetName(m.Name).
|
||
SetProvider(channelmonitor.Provider(m.Provider)).
|
||
SetEndpoint(m.Endpoint).
|
||
SetAPIKeyEncrypted(m.APIKey).
|
||
SetPrimaryModel(m.PrimaryModel).
|
||
SetExtraModels(emptySliceIfNil(m.ExtraModels)).
|
||
SetGroupName(m.GroupName).
|
||
SetEnabled(m.Enabled).
|
||
SetIntervalSeconds(m.IntervalSeconds).
|
||
SetExtraHeaders(emptyHeadersIfNilRepo(m.ExtraHeaders)).
|
||
SetBodyOverrideMode(defaultBodyModeRepo(m.BodyOverrideMode))
|
||
if m.TemplateID != nil {
|
||
updater = updater.SetTemplateID(*m.TemplateID)
|
||
} else {
|
||
updater = updater.ClearTemplateID()
|
||
}
|
||
if m.BodyOverride != nil {
|
||
updater = updater.SetBodyOverride(m.BodyOverride)
|
||
} else {
|
||
updater = updater.ClearBodyOverride()
|
||
}
|
||
|
||
updated, err := updater.Save(ctx)
|
||
if err != nil {
|
||
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
|
||
}
|
||
m.UpdatedAt = updated.UpdatedAt
|
||
return nil
|
||
}
|
||
|
||
func (r *channelMonitorRepository) Delete(ctx context.Context, id int64) error {
|
||
client := clientFromContext(ctx, r.client)
|
||
if err := client.ChannelMonitor.DeleteOneID(id).Exec(ctx); err != nil {
|
||
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (r *channelMonitorRepository) List(ctx context.Context, params service.ChannelMonitorListParams) ([]*service.ChannelMonitor, int64, error) {
|
||
q := r.client.ChannelMonitor.Query()
|
||
if params.Provider != "" {
|
||
q = q.Where(channelmonitor.ProviderEQ(channelmonitor.Provider(params.Provider)))
|
||
}
|
||
if params.Enabled != nil {
|
||
q = q.Where(channelmonitor.EnabledEQ(*params.Enabled))
|
||
}
|
||
if s := strings.TrimSpace(params.Search); s != "" {
|
||
q = q.Where(channelmonitor.Or(
|
||
channelmonitor.NameContainsFold(s),
|
||
channelmonitor.GroupNameContainsFold(s),
|
||
channelmonitor.PrimaryModelContainsFold(s),
|
||
))
|
||
}
|
||
|
||
total, err := q.Count(ctx)
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("count monitors: %w", err)
|
||
}
|
||
|
||
pageSize := params.PageSize
|
||
if pageSize <= 0 {
|
||
pageSize = 20
|
||
}
|
||
page := params.Page
|
||
if page <= 0 {
|
||
page = 1
|
||
}
|
||
|
||
rows, err := q.
|
||
Order(dbent.Desc(channelmonitor.FieldID)).
|
||
Offset((page - 1) * pageSize).
|
||
Limit(pageSize).
|
||
All(ctx)
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("list monitors: %w", err)
|
||
}
|
||
|
||
out := make([]*service.ChannelMonitor, 0, len(rows))
|
||
for _, row := range rows {
|
||
out = append(out, entToServiceMonitor(row))
|
||
}
|
||
return out, int64(total), nil
|
||
}
|
||
|
||
// ---------- 调度器辅助 ----------
|
||
|
||
func (r *channelMonitorRepository) ListEnabled(ctx context.Context) ([]*service.ChannelMonitor, error) {
|
||
rows, err := r.client.ChannelMonitor.Query().
|
||
Where(channelmonitor.EnabledEQ(true)).
|
||
All(ctx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("list enabled monitors: %w", err)
|
||
}
|
||
out := make([]*service.ChannelMonitor, 0, len(rows))
|
||
for _, row := range rows {
|
||
out = append(out, entToServiceMonitor(row))
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func (r *channelMonitorRepository) MarkChecked(ctx context.Context, id int64, checkedAt time.Time) error {
|
||
client := clientFromContext(ctx, r.client)
|
||
if err := client.ChannelMonitor.UpdateOneID(id).
|
||
SetLastCheckedAt(checkedAt).
|
||
Exec(ctx); err != nil {
|
||
return translatePersistenceError(err, service.ErrChannelMonitorNotFound, nil)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (r *channelMonitorRepository) InsertHistoryBatch(ctx context.Context, rows []*service.ChannelMonitorHistoryRow) error {
|
||
if len(rows) == 0 {
|
||
return nil
|
||
}
|
||
client := clientFromContext(ctx, r.client)
|
||
bulk := make([]*dbent.ChannelMonitorHistoryCreate, 0, len(rows))
|
||
for _, row := range rows {
|
||
c := client.ChannelMonitorHistory.Create().
|
||
SetMonitorID(row.MonitorID).
|
||
SetModel(row.Model).
|
||
SetStatus(channelmonitorhistory.Status(row.Status)).
|
||
SetMessage(row.Message).
|
||
SetCheckedAt(row.CheckedAt)
|
||
if row.LatencyMs != nil {
|
||
c = c.SetLatencyMs(*row.LatencyMs)
|
||
}
|
||
if row.PingLatencyMs != nil {
|
||
c = c.SetPingLatencyMs(*row.PingLatencyMs)
|
||
}
|
||
bulk = append(bulk, c)
|
||
}
|
||
if _, err := client.ChannelMonitorHistory.CreateBulk(bulk...).Save(ctx); err != nil {
|
||
return fmt.Errorf("insert history bulk: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// DeleteHistoryBefore 物理删 checked_at < before 的明细,分批 channelMonitorPruneBatchSize 行一批,
|
||
// 避免单事务删除过多引起锁/WAL 压力。借助 (checked_at) 索引定位小批 id,再按 id 删。
|
||
func (r *channelMonitorRepository) DeleteHistoryBefore(ctx context.Context, before time.Time) (int64, error) {
|
||
return deleteChannelMonitorBatched(ctx, r.db, channelMonitorPruneHistorySQL, before)
|
||
}
|
||
|
||
// ListHistory 按 checked_at 倒序返回某个监控的最近 N 条历史记录。
|
||
// model 为空时不过滤;非空时只返回该模型的记录。
|
||
func (r *channelMonitorRepository) ListHistory(ctx context.Context, monitorID int64, model string, limit int) ([]*service.ChannelMonitorHistoryEntry, error) {
|
||
q := r.client.ChannelMonitorHistory.Query().
|
||
Where(channelmonitorhistory.MonitorIDEQ(monitorID))
|
||
if strings.TrimSpace(model) != "" {
|
||
q = q.Where(channelmonitorhistory.ModelEQ(model))
|
||
}
|
||
rows, err := q.
|
||
Order(dbent.Desc(channelmonitorhistory.FieldCheckedAt)).
|
||
Limit(limit).
|
||
All(ctx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("list history: %w", err)
|
||
}
|
||
out := make([]*service.ChannelMonitorHistoryEntry, 0, len(rows))
|
||
for _, row := range rows {
|
||
entry := &service.ChannelMonitorHistoryEntry{
|
||
ID: row.ID,
|
||
Model: row.Model,
|
||
Status: string(row.Status),
|
||
LatencyMs: row.LatencyMs,
|
||
PingLatencyMs: row.PingLatencyMs,
|
||
Message: row.Message,
|
||
CheckedAt: row.CheckedAt,
|
||
}
|
||
out = append(out, entry)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// ---------- 用户视图聚合(原生 SQL) ----------
|
||
|
||
// ListLatestPerModel 用 DISTINCT ON 取每个 (monitor_id, model) 的最近一条记录。
|
||
// 借助 (monitor_id, model, checked_at DESC) 索引可走 Index Scan。
|
||
func (r *channelMonitorRepository) ListLatestPerModel(ctx context.Context, monitorID int64) ([]*service.ChannelMonitorLatest, error) {
|
||
const q = `
|
||
SELECT DISTINCT ON (model)
|
||
model, status, latency_ms, ping_latency_ms, checked_at
|
||
FROM channel_monitor_histories
|
||
WHERE monitor_id = $1
|
||
ORDER BY model, checked_at DESC
|
||
`
|
||
rows, err := r.db.QueryContext(ctx, q, monitorID)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("query latest per model: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
|
||
out := make([]*service.ChannelMonitorLatest, 0)
|
||
for rows.Next() {
|
||
l := &service.ChannelMonitorLatest{}
|
||
var latency, ping sql.NullInt64
|
||
if err := rows.Scan(&l.Model, &l.Status, &latency, &ping, &l.CheckedAt); err != nil {
|
||
return nil, fmt.Errorf("scan latest row: %w", err)
|
||
}
|
||
assignNullInt(&l.LatencyMs, latency)
|
||
assignNullInt(&l.PingLatencyMs, ping)
|
||
out = append(out, l)
|
||
}
|
||
return out, rows.Err()
|
||
}
|
||
|
||
// assignNullInt 把 sql.NullInt64 解包到 *int 指针目标(valid 才分配新 int)。
|
||
// 集中实现避免 latency / ping 两处重复 if latency.Valid { v := int(...) ... } 模板。
|
||
func assignNullInt(dst **int, n sql.NullInt64) {
|
||
if !n.Valid {
|
||
return
|
||
}
|
||
v := int(n.Int64)
|
||
*dst = &v
|
||
}
|
||
|
||
// ComputeAvailability 计算指定窗口内每个模型的可用率与平均延迟。
|
||
// "可用" = status IN (operational, degraded)。
|
||
//
|
||
// 数据来源:明细表只保留 1 天;窗口前其余天数走聚合表。
|
||
// - raw = 今天(CURRENT_DATE 起)的未软删明细,按 model 累加
|
||
// - rollup = [CURRENT_DATE - windowDays, CURRENT_DATE) 区间的聚合行
|
||
//
|
||
// 总窗口为 "今天 + 过去 windowDays 天",比 windowDays 字面值大 1 天,但因为聚合
|
||
// 是按整 UTC 日切的,这是聚合化无法避免的精度损失,且偏宽不偏窄(数据更全)。
|
||
func (r *channelMonitorRepository) ComputeAvailability(ctx context.Context, monitorID int64, windowDays int) ([]*service.ChannelMonitorAvailability, error) {
|
||
if windowDays <= 0 {
|
||
windowDays = 7
|
||
}
|
||
const q = `
|
||
WITH raw AS (
|
||
SELECT model,
|
||
COUNT(*) AS total_checks,
|
||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
|
||
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
|
||
COUNT(latency_ms) AS count_latency
|
||
FROM channel_monitor_histories
|
||
WHERE monitor_id = $1
|
||
AND checked_at >= CURRENT_DATE
|
||
GROUP BY model
|
||
),
|
||
rollup AS (
|
||
SELECT model, total_checks, ok_count, sum_latency_ms, count_latency
|
||
FROM channel_monitor_daily_rollups
|
||
WHERE monitor_id = $1
|
||
AND bucket_date >= (CURRENT_DATE - $2::int)
|
||
AND bucket_date < CURRENT_DATE
|
||
)
|
||
SELECT model,
|
||
SUM(total_checks) AS total,
|
||
SUM(ok_count) AS ok,
|
||
CASE WHEN SUM(count_latency) > 0
|
||
THEN SUM(sum_latency_ms)::float8 / SUM(count_latency)
|
||
ELSE NULL END AS avg_latency_ms
|
||
FROM (SELECT * FROM raw UNION ALL SELECT * FROM rollup) combined
|
||
GROUP BY model
|
||
`
|
||
rows, err := r.db.QueryContext(ctx, q, monitorID, windowDays)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("query availability: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
|
||
out := make([]*service.ChannelMonitorAvailability, 0)
|
||
for rows.Next() {
|
||
row, err := scanAvailabilityRow(rows, windowDays)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
out = append(out, row)
|
||
}
|
||
return out, rows.Err()
|
||
}
|
||
|
||
// scanAvailabilityRow 把单行 (model, total, ok, avg_latency) 扫描为 ChannelMonitorAvailability。
|
||
// 仅服务于 ComputeAvailability(4 列);批量版本因为多一列 monitor_id 直接 inline 调 finalizeAvailabilityRow。
|
||
func scanAvailabilityRow(rows interface{ Scan(...any) error }, windowDays int) (*service.ChannelMonitorAvailability, error) {
|
||
row := &service.ChannelMonitorAvailability{WindowDays: windowDays}
|
||
var avgLatency sql.NullFloat64
|
||
if err := rows.Scan(&row.Model, &row.TotalChecks, &row.OperationalChecks, &avgLatency); err != nil {
|
||
return nil, fmt.Errorf("scan availability row: %w", err)
|
||
}
|
||
finalizeAvailabilityRow(row, avgLatency)
|
||
return row, nil
|
||
}
|
||
|
||
// finalizeAvailabilityRow 根据 OperationalChecks/TotalChecks 算出可用率,
|
||
// 并把 sql.NullFloat64 的平均延迟解包为 *int。两处复用避免维护漂移。
|
||
func finalizeAvailabilityRow(row *service.ChannelMonitorAvailability, avgLatency sql.NullFloat64) {
|
||
if row.TotalChecks > 0 {
|
||
row.AvailabilityPct = float64(row.OperationalChecks) * 100.0 / float64(row.TotalChecks)
|
||
}
|
||
if avgLatency.Valid {
|
||
v := int(avgLatency.Float64)
|
||
row.AvgLatencyMs = &v
|
||
}
|
||
}
|
||
|
||
// ListLatestForMonitorIDs 一次性查询多个监控的"每个 (monitor_id, model) 最近一条"记录。
|
||
// 利用 PG 的 DISTINCT ON 特性,借助 (monitor_id, model, checked_at DESC) 索引可走 Index Scan。
|
||
func (r *channelMonitorRepository) ListLatestForMonitorIDs(ctx context.Context, ids []int64) (map[int64][]*service.ChannelMonitorLatest, error) {
|
||
out := make(map[int64][]*service.ChannelMonitorLatest, len(ids))
|
||
if len(ids) == 0 {
|
||
return out, nil
|
||
}
|
||
const q = `
|
||
SELECT DISTINCT ON (monitor_id, model)
|
||
monitor_id, model, status, latency_ms, ping_latency_ms, checked_at
|
||
FROM channel_monitor_histories
|
||
WHERE monitor_id = ANY($1)
|
||
ORDER BY monitor_id, model, checked_at DESC
|
||
`
|
||
rows, err := r.db.QueryContext(ctx, q, pq.Array(ids))
|
||
if err != nil {
|
||
return nil, fmt.Errorf("query latest batch: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
|
||
for rows.Next() {
|
||
var monitorID int64
|
||
l := &service.ChannelMonitorLatest{}
|
||
var latency, ping sql.NullInt64
|
||
if err := rows.Scan(&monitorID, &l.Model, &l.Status, &latency, &ping, &l.CheckedAt); err != nil {
|
||
return nil, fmt.Errorf("scan latest batch row: %w", err)
|
||
}
|
||
assignNullInt(&l.LatencyMs, latency)
|
||
assignNullInt(&l.PingLatencyMs, ping)
|
||
out[monitorID] = append(out[monitorID], l)
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return nil, err
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// ListRecentHistoryForMonitors 为多个 monitor 批量取各自"指定模型"最近 N 条历史(按 checked_at DESC,最新在前)。
|
||
// primaryModels[monitorID] 指定该监控要过滤的模型名;monitor 不在 primaryModels 中的记录不返回。
|
||
// 通过 CTE + unnest(两个 int8/text 数组) 构造 (monitor_id, model) 白名单,
|
||
// 再用 ROW_NUMBER() OVER (PARTITION BY monitor_id) 取各自前 N 条。
|
||
//
|
||
// 返回值:map[monitorID] -> []*ChannelMonitorHistoryEntry(不含 message,减少网络开销)。
|
||
// 空 ids / 空 primaryModels 返回空 map,不报错。
|
||
func (r *channelMonitorRepository) ListRecentHistoryForMonitors(
|
||
ctx context.Context,
|
||
ids []int64,
|
||
primaryModels map[int64]string,
|
||
perMonitorLimit int,
|
||
) (map[int64][]*service.ChannelMonitorHistoryEntry, error) {
|
||
out := make(map[int64][]*service.ChannelMonitorHistoryEntry, len(ids))
|
||
pairIDs, pairModels := buildMonitorModelPairs(ids, primaryModels)
|
||
if len(pairIDs) == 0 {
|
||
return out, nil
|
||
}
|
||
perMonitorLimit = clampTimelineLimit(perMonitorLimit)
|
||
|
||
const q = `
|
||
WITH targets AS (
|
||
SELECT unnest($1::bigint[]) AS monitor_id,
|
||
unnest($2::text[]) AS model
|
||
),
|
||
ranked AS (
|
||
SELECT h.monitor_id,
|
||
h.status,
|
||
h.latency_ms,
|
||
h.ping_latency_ms,
|
||
h.checked_at,
|
||
ROW_NUMBER() OVER (PARTITION BY h.monitor_id ORDER BY h.checked_at DESC) AS rn
|
||
FROM channel_monitor_histories h
|
||
JOIN targets t
|
||
ON t.monitor_id = h.monitor_id AND t.model = h.model
|
||
)
|
||
SELECT monitor_id, status, latency_ms, ping_latency_ms, checked_at
|
||
FROM ranked
|
||
WHERE rn <= $3
|
||
ORDER BY monitor_id, checked_at DESC
|
||
`
|
||
rows, err := r.db.QueryContext(ctx, q, pq.Array(pairIDs), pq.Array(pairModels), perMonitorLimit)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("query recent history batch: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
|
||
for rows.Next() {
|
||
var monitorID int64
|
||
entry := &service.ChannelMonitorHistoryEntry{}
|
||
var latency, ping sql.NullInt64
|
||
if err := rows.Scan(&monitorID, &entry.Status, &latency, &ping, &entry.CheckedAt); err != nil {
|
||
return nil, fmt.Errorf("scan recent history row: %w", err)
|
||
}
|
||
assignNullInt(&entry.LatencyMs, latency)
|
||
assignNullInt(&entry.PingLatencyMs, ping)
|
||
out[monitorID] = append(out[monitorID], entry)
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return nil, err
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// buildMonitorModelPairs 基于 ids 过滤出有效的 (monitor_id, model) 对,model 为空时跳过。
|
||
// 保证两个数组长度一致且一一对应,供 unnest 展开。
|
||
func buildMonitorModelPairs(ids []int64, primaryModels map[int64]string) ([]int64, []string) {
|
||
if len(ids) == 0 || len(primaryModels) == 0 {
|
||
return nil, nil
|
||
}
|
||
pairIDs := make([]int64, 0, len(ids))
|
||
pairModels := make([]string, 0, len(ids))
|
||
for _, id := range ids {
|
||
model, ok := primaryModels[id]
|
||
if !ok || strings.TrimSpace(model) == "" {
|
||
continue
|
||
}
|
||
pairIDs = append(pairIDs, id)
|
||
pairModels = append(pairModels, model)
|
||
}
|
||
return pairIDs, pairModels
|
||
}
|
||
|
||
// timelineLimit* 批量 timeline 查询的 perMonitorLimit 夹紧范围。
|
||
// 下限 1 表示至少返回最近一条;上限 200 控制单次响应体与 SQL 内存占用(ROW_NUMBER 窗口上限)。
|
||
const (
|
||
timelineLimitMin = 1
|
||
timelineLimitMax = 200
|
||
)
|
||
|
||
// clampTimelineLimit 把 perMonitorLimit 夹紧到 [timelineLimitMin, timelineLimitMax],避免非法值或超大查询。
|
||
func clampTimelineLimit(n int) int {
|
||
if n < timelineLimitMin {
|
||
return timelineLimitMin
|
||
}
|
||
if n > timelineLimitMax {
|
||
return timelineLimitMax
|
||
}
|
||
return n
|
||
}
|
||
|
||
// ComputeAvailabilityForMonitors 一次性计算多个监控在某个窗口内的每模型可用率与平均延迟。
|
||
// 与单 monitor 版本同构:明细只覆盖今天,更早走聚合表 UNION 合并。
|
||
func (r *channelMonitorRepository) ComputeAvailabilityForMonitors(ctx context.Context, ids []int64, windowDays int) (map[int64][]*service.ChannelMonitorAvailability, error) {
|
||
out := make(map[int64][]*service.ChannelMonitorAvailability, len(ids))
|
||
if len(ids) == 0 {
|
||
return out, nil
|
||
}
|
||
if windowDays <= 0 {
|
||
windowDays = 7
|
||
}
|
||
const q = `
|
||
WITH raw AS (
|
||
SELECT monitor_id,
|
||
model,
|
||
COUNT(*) AS total_checks,
|
||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
|
||
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
|
||
COUNT(latency_ms) AS count_latency
|
||
FROM channel_monitor_histories
|
||
WHERE monitor_id = ANY($1)
|
||
AND checked_at >= CURRENT_DATE
|
||
GROUP BY monitor_id, model
|
||
),
|
||
rollup AS (
|
||
SELECT monitor_id, model, total_checks, ok_count, sum_latency_ms, count_latency
|
||
FROM channel_monitor_daily_rollups
|
||
WHERE monitor_id = ANY($1)
|
||
AND bucket_date >= (CURRENT_DATE - $2::int)
|
||
AND bucket_date < CURRENT_DATE
|
||
)
|
||
SELECT monitor_id,
|
||
model,
|
||
SUM(total_checks) AS total,
|
||
SUM(ok_count) AS ok,
|
||
CASE WHEN SUM(count_latency) > 0
|
||
THEN SUM(sum_latency_ms)::float8 / SUM(count_latency)
|
||
ELSE NULL END AS avg_latency_ms
|
||
FROM (SELECT * FROM raw UNION ALL SELECT * FROM rollup) combined
|
||
GROUP BY monitor_id, model
|
||
`
|
||
rows, err := r.db.QueryContext(ctx, q, pq.Array(ids), windowDays)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("query availability batch: %w", err)
|
||
}
|
||
defer func() { _ = rows.Close() }()
|
||
|
||
for rows.Next() {
|
||
var monitorID int64
|
||
row := &service.ChannelMonitorAvailability{WindowDays: windowDays}
|
||
var avgLatency sql.NullFloat64
|
||
if err := rows.Scan(&monitorID, &row.Model, &row.TotalChecks, &row.OperationalChecks, &avgLatency); err != nil {
|
||
return nil, fmt.Errorf("scan availability batch row: %w", err)
|
||
}
|
||
// 批量查询多了首列 monitor_id;其余字段的可用率/平均延迟换算与单 monitor 版本一致,
|
||
// 抽出 finalizeAvailabilityRow 复用,避免两处分别维护除法与 NullFloat 解包。
|
||
finalizeAvailabilityRow(row, avgLatency)
|
||
out[monitorID] = append(out[monitorID], row)
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return nil, err
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// ---------- 聚合维护 ----------
|
||
|
||
// UpsertDailyRollupsFor 把 targetDate 当天([targetDate, targetDate+1d))的明细
|
||
// 按 (monitor_id, model, bucket_date) 聚合写入 channel_monitor_daily_rollups。
|
||
// - 用 ON CONFLICT (monitor_id, model, bucket_date) DO UPDATE 实现幂等回填,
|
||
// 重复执行只会用最新统计覆盖;
|
||
// - $1::date 让 PG 自动把入参 truncate 到 UTC 日期,调用方不需要预处理 targetDate。
|
||
func (r *channelMonitorRepository) UpsertDailyRollupsFor(ctx context.Context, targetDate time.Time) (int64, error) {
|
||
const q = `
|
||
INSERT INTO channel_monitor_daily_rollups (
|
||
monitor_id, model, bucket_date,
|
||
total_checks, ok_count,
|
||
operational_count, degraded_count, failed_count, error_count,
|
||
sum_latency_ms, count_latency,
|
||
sum_ping_latency_ms, count_ping_latency,
|
||
computed_at
|
||
)
|
||
SELECT
|
||
monitor_id,
|
||
model,
|
||
$1::date AS bucket_date,
|
||
COUNT(*) AS total_checks,
|
||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
|
||
COUNT(*) FILTER (WHERE status = 'operational') AS operational_count,
|
||
COUNT(*) FILTER (WHERE status = 'degraded') AS degraded_count,
|
||
COUNT(*) FILTER (WHERE status = 'failed') AS failed_count,
|
||
COUNT(*) FILTER (WHERE status = 'error') AS error_count,
|
||
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
|
||
COUNT(latency_ms) AS count_latency,
|
||
COALESCE(SUM(ping_latency_ms) FILTER (WHERE ping_latency_ms IS NOT NULL), 0) AS sum_ping_latency_ms,
|
||
COUNT(ping_latency_ms) AS count_ping_latency,
|
||
NOW()
|
||
FROM channel_monitor_histories
|
||
WHERE checked_at >= $1::date
|
||
AND checked_at < ($1::date + INTERVAL '1 day')
|
||
GROUP BY monitor_id, model
|
||
ON CONFLICT (monitor_id, model, bucket_date) DO UPDATE SET
|
||
total_checks = EXCLUDED.total_checks,
|
||
ok_count = EXCLUDED.ok_count,
|
||
operational_count = EXCLUDED.operational_count,
|
||
degraded_count = EXCLUDED.degraded_count,
|
||
failed_count = EXCLUDED.failed_count,
|
||
error_count = EXCLUDED.error_count,
|
||
sum_latency_ms = EXCLUDED.sum_latency_ms,
|
||
count_latency = EXCLUDED.count_latency,
|
||
sum_ping_latency_ms = EXCLUDED.sum_ping_latency_ms,
|
||
count_ping_latency = EXCLUDED.count_ping_latency,
|
||
computed_at = NOW()
|
||
`
|
||
res, err := r.db.ExecContext(ctx, q, targetDate)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("upsert daily rollups for %s: %w", targetDate.Format("2006-01-02"), err)
|
||
}
|
||
n, err := res.RowsAffected()
|
||
if err != nil {
|
||
return 0, fmt.Errorf("rows affected (upsert rollups): %w", err)
|
||
}
|
||
return n, nil
|
||
}
|
||
|
||
// DeleteRollupsBefore 物理删 bucket_date < beforeDate 的聚合行,同样分批。
|
||
func (r *channelMonitorRepository) DeleteRollupsBefore(ctx context.Context, beforeDate time.Time) (int64, error) {
|
||
return deleteChannelMonitorBatched(ctx, r.db, channelMonitorPruneRollupSQL, beforeDate)
|
||
}
|
||
|
||
// channelMonitorPruneBatchSize 单批删除上限。与 ops_cleanup_service 保持一致的 5000,
|
||
// 在大表上按 id 小批删可以避免长事务和 WAL 堆积。
|
||
const channelMonitorPruneBatchSize = 5000
|
||
|
||
// channelMonitorPruneHistorySQL 分批物理删明细表过期行。
|
||
const channelMonitorPruneHistorySQL = `
|
||
WITH batch AS (
|
||
SELECT id FROM channel_monitor_histories
|
||
WHERE checked_at < $1
|
||
ORDER BY id
|
||
LIMIT $2
|
||
)
|
||
DELETE FROM channel_monitor_histories
|
||
WHERE id IN (SELECT id FROM batch)
|
||
`
|
||
|
||
// channelMonitorPruneRollupSQL 分批物理删 rollup 表过期行。bucket_date 需要 ::date 转型
|
||
// 保证与 DATE 列一致比较。
|
||
const channelMonitorPruneRollupSQL = `
|
||
WITH batch AS (
|
||
SELECT id FROM channel_monitor_daily_rollups
|
||
WHERE bucket_date < $1::date
|
||
ORDER BY id
|
||
LIMIT $2
|
||
)
|
||
DELETE FROM channel_monitor_daily_rollups
|
||
WHERE id IN (SELECT id FROM batch)
|
||
`
|
||
|
||
// deleteChannelMonitorBatched 循环执行分批 DELETE,直到影响行为 0。返回累计删除行数。
|
||
// cutoff 由调用方按列类型传入(明细用 time.Time 对 TIMESTAMPTZ,rollup 用 time.Time SQL 侧 ::date 转型)。
|
||
func deleteChannelMonitorBatched(ctx context.Context, db *sql.DB, query string, cutoff time.Time) (int64, error) {
|
||
var total int64
|
||
for {
|
||
res, err := db.ExecContext(ctx, query, cutoff, channelMonitorPruneBatchSize)
|
||
if err != nil {
|
||
return total, fmt.Errorf("channel_monitor prune batch: %w", err)
|
||
}
|
||
affected, err := res.RowsAffected()
|
||
if err != nil {
|
||
return total, fmt.Errorf("channel_monitor prune rows affected: %w", err)
|
||
}
|
||
total += affected
|
||
if affected == 0 {
|
||
break
|
||
}
|
||
}
|
||
return total, nil
|
||
}
|
||
|
||
// LoadAggregationWatermark 读 watermark 表(id=1)。
|
||
// watermark 表不是 ent schema(只有一行),直接走原生 SQL。
|
||
// - 行不存在或 last_aggregated_date IS NULL:返回 (nil, nil),由调用方决定首次回填策略
|
||
func (r *channelMonitorRepository) LoadAggregationWatermark(ctx context.Context) (*time.Time, error) {
|
||
const q = `SELECT last_aggregated_date FROM channel_monitor_aggregation_watermark WHERE id = 1`
|
||
var t sql.NullTime
|
||
if err := r.db.QueryRowContext(ctx, q).Scan(&t); err != nil {
|
||
if err == sql.ErrNoRows {
|
||
return nil, nil
|
||
}
|
||
return nil, fmt.Errorf("load aggregation watermark: %w", err)
|
||
}
|
||
if !t.Valid {
|
||
return nil, nil
|
||
}
|
||
return &t.Time, nil
|
||
}
|
||
|
||
// UpdateAggregationWatermark 更新 watermark(UPSERT 到 id=1)。
|
||
// $1::date 让 PG 把入参 truncate 到 UTC 日期,与 last_aggregated_date 列的 DATE 类型一致。
|
||
func (r *channelMonitorRepository) UpdateAggregationWatermark(ctx context.Context, date time.Time) error {
|
||
const q = `
|
||
INSERT INTO channel_monitor_aggregation_watermark (id, last_aggregated_date, updated_at)
|
||
VALUES (1, $1::date, NOW())
|
||
ON CONFLICT (id) DO UPDATE SET
|
||
last_aggregated_date = EXCLUDED.last_aggregated_date,
|
||
updated_at = NOW()
|
||
`
|
||
if _, err := r.db.ExecContext(ctx, q, date); err != nil {
|
||
return fmt.Errorf("update aggregation watermark: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ---------- helpers ----------
|
||
|
||
func entToServiceMonitor(row *dbent.ChannelMonitor) *service.ChannelMonitor {
|
||
if row == nil {
|
||
return nil
|
||
}
|
||
extras := row.ExtraModels
|
||
if extras == nil {
|
||
extras = []string{}
|
||
}
|
||
headers := row.ExtraHeaders
|
||
if headers == nil {
|
||
headers = map[string]string{}
|
||
}
|
||
out := &service.ChannelMonitor{
|
||
ID: row.ID,
|
||
Name: row.Name,
|
||
Provider: string(row.Provider),
|
||
Endpoint: row.Endpoint,
|
||
APIKey: row.APIKeyEncrypted, // 仍为密文,service 层负责解密
|
||
PrimaryModel: row.PrimaryModel,
|
||
ExtraModels: extras,
|
||
GroupName: row.GroupName,
|
||
Enabled: row.Enabled,
|
||
IntervalSeconds: row.IntervalSeconds,
|
||
LastCheckedAt: row.LastCheckedAt,
|
||
CreatedBy: row.CreatedBy,
|
||
CreatedAt: row.CreatedAt,
|
||
UpdatedAt: row.UpdatedAt,
|
||
ExtraHeaders: headers,
|
||
BodyOverrideMode: row.BodyOverrideMode,
|
||
BodyOverride: row.BodyOverride,
|
||
}
|
||
if row.TemplateID != nil {
|
||
id := *row.TemplateID
|
||
out.TemplateID = &id
|
||
}
|
||
return out
|
||
}
|
||
|
||
// emptyHeadersIfNilRepo 与 service.emptyHeadersIfNil 功能一致,
|
||
// repo 独立一份避免 import 循环。
|
||
func emptyHeadersIfNilRepo(h map[string]string) map[string]string {
|
||
if h == nil {
|
||
return map[string]string{}
|
||
}
|
||
return h
|
||
}
|
||
|
||
// defaultBodyModeRepo 空串归一为 off(同上不循环)。
|
||
func defaultBodyModeRepo(mode string) string {
|
||
if mode == "" {
|
||
return "off"
|
||
}
|
||
return mode
|
||
}
|
||
|
||
func emptySliceIfNil(in []string) []string {
|
||
if in == nil {
|
||
return []string{}
|
||
}
|
||
return in
|
||
}
|