2026-01-11 16:01:35 +08:00
package repository
import (
"context"
"database/sql"
"fmt"
2026-01-12 10:53:41 +08:00
"log"
2026-01-11 16:01:35 +08:00
"strings"
"time"
2026-01-15 11:22:13 +08:00
"github.com/Wei-Shaw/sub2api/internal/pkg/timezone"
2026-01-11 16:01:35 +08:00
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/lib/pq"
)
type dashboardAggregationRepository struct {
sql sqlExecutor
}
// NewDashboardAggregationRepository 创建仪表盘预聚合仓储。
func NewDashboardAggregationRepository ( sqlDB * sql . DB ) service . DashboardAggregationRepository {
2026-01-12 10:53:41 +08:00
if sqlDB == nil {
return nil
}
if ! isPostgresDriver ( sqlDB ) {
log . Printf ( "[DashboardAggregation] 检测到非 PostgreSQL 驱动,已自动禁用预聚合" )
return nil
}
2026-01-11 16:01:35 +08:00
return newDashboardAggregationRepositoryWithSQL ( sqlDB )
}
func newDashboardAggregationRepositoryWithSQL ( sqlq sqlExecutor ) * dashboardAggregationRepository {
return & dashboardAggregationRepository { sql : sqlq }
}
2026-01-12 10:53:41 +08:00
func isPostgresDriver ( db * sql . DB ) bool {
if db == nil {
return false
}
_ , ok := db . Driver ( ) . ( * pq . Driver )
return ok
}
2026-01-11 16:01:35 +08:00
func ( r * dashboardAggregationRepository ) AggregateRange ( ctx context . Context , start , end time . Time ) error {
2026-01-15 11:22:13 +08:00
loc := timezone . Location ( )
startLocal := start . In ( loc )
endLocal := end . In ( loc )
if ! endLocal . After ( startLocal ) {
2026-01-11 16:01:35 +08:00
return nil
}
2026-01-15 11:22:13 +08:00
hourStart := startLocal . Truncate ( time . Hour )
hourEnd := endLocal . Truncate ( time . Hour )
if endLocal . After ( hourEnd ) {
2026-01-11 16:01:35 +08:00
hourEnd = hourEnd . Add ( time . Hour )
}
2026-01-15 11:22:13 +08:00
dayStart := truncateToDay ( startLocal )
dayEnd := truncateToDay ( endLocal )
if endLocal . After ( dayEnd ) {
2026-01-11 16:01:35 +08:00
dayEnd = dayEnd . Add ( 24 * time . Hour )
}
2026-01-11 17:21:17 +08:00
// 以桶边界聚合,允许覆盖 end 所在桶的剩余区间。
if err := r . insertHourlyActiveUsers ( ctx , hourStart , hourEnd ) ; err != nil {
2026-01-11 16:01:35 +08:00
return err
}
2026-01-11 17:21:17 +08:00
if err := r . insertDailyActiveUsers ( ctx , hourStart , hourEnd ) ; err != nil {
2026-01-11 16:01:35 +08:00
return err
}
if err := r . upsertHourlyAggregates ( ctx , hourStart , hourEnd ) ; err != nil {
2026-01-18 10:52:18 +08:00
return err
}
if err := r . upsertDailyAggregates ( ctx , dayStart , dayEnd ) ; err != nil {
return err
}
return nil
}
func ( r * dashboardAggregationRepository ) RecomputeRange ( ctx context . Context , start , end time . Time ) error {
if r == nil || r . sql == nil {
return nil
}
loc := timezone . Location ( )
startLocal := start . In ( loc )
endLocal := end . In ( loc )
if ! endLocal . After ( startLocal ) {
return nil
}
hourStart := startLocal . Truncate ( time . Hour )
hourEnd := endLocal . Truncate ( time . Hour )
if endLocal . After ( hourEnd ) {
hourEnd = hourEnd . Add ( time . Hour )
}
dayStart := truncateToDay ( startLocal )
dayEnd := truncateToDay ( endLocal )
if endLocal . After ( dayEnd ) {
dayEnd = dayEnd . Add ( 24 * time . Hour )
}
// 尽量使用事务保证范围内的一致性(允许在非 *sql.DB 的情况下退化为非事务执行)。
if db , ok := r . sql . ( * sql . DB ) ; ok {
tx , err := db . BeginTx ( ctx , nil )
if err != nil {
return err
}
txRepo := newDashboardAggregationRepositoryWithSQL ( tx )
if err := txRepo . recomputeRangeInTx ( ctx , hourStart , hourEnd , dayStart , dayEnd ) ; err != nil {
_ = tx . Rollback ( )
return err
}
return tx . Commit ( )
}
return r . recomputeRangeInTx ( ctx , hourStart , hourEnd , dayStart , dayEnd )
}
func ( r * dashboardAggregationRepository ) recomputeRangeInTx ( ctx context . Context , hourStart , hourEnd , dayStart , dayEnd time . Time ) error {
// 先清空范围内桶,再重建(避免仅增量插入导致活跃用户等指标无法回退)。
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_hourly WHERE bucket_start >= $1 AND bucket_start < $2" , hourStart , hourEnd ) ; err != nil {
return err
}
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_hourly_users WHERE bucket_start >= $1 AND bucket_start < $2" , hourStart , hourEnd ) ; err != nil {
return err
}
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_daily WHERE bucket_date >= $1::date AND bucket_date < $2::date" , dayStart , dayEnd ) ; err != nil {
return err
}
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_daily_users WHERE bucket_date >= $1::date AND bucket_date < $2::date" , dayStart , dayEnd ) ; err != nil {
return err
}
if err := r . insertHourlyActiveUsers ( ctx , hourStart , hourEnd ) ; err != nil {
return err
}
if err := r . insertDailyActiveUsers ( ctx , hourStart , hourEnd ) ; err != nil {
return err
}
if err := r . upsertHourlyAggregates ( ctx , hourStart , hourEnd ) ; err != nil {
2026-01-11 16:01:35 +08:00
return err
}
if err := r . upsertDailyAggregates ( ctx , dayStart , dayEnd ) ; err != nil {
return err
}
return nil
}
func ( r * dashboardAggregationRepository ) GetAggregationWatermark ( ctx context . Context ) ( time . Time , error ) {
var ts time . Time
query := "SELECT last_aggregated_at FROM usage_dashboard_aggregation_watermark WHERE id = 1"
if err := scanSingleRow ( ctx , r . sql , query , nil , & ts ) ; err != nil {
if err == sql . ErrNoRows {
return time . Unix ( 0 , 0 ) . UTC ( ) , nil
}
return time . Time { } , err
}
return ts . UTC ( ) , nil
}
func ( r * dashboardAggregationRepository ) UpdateAggregationWatermark ( ctx context . Context , aggregatedAt time . Time ) error {
query := `
INSERT INTO usage_dashboard_aggregation_watermark ( id , last_aggregated_at , updated_at )
VALUES ( 1 , $ 1 , NOW ( ) )
ON CONFLICT ( id )
DO UPDATE SET last_aggregated_at = EXCLUDED . last_aggregated_at , updated_at = EXCLUDED . updated_at
`
_ , err := r . sql . ExecContext ( ctx , query , aggregatedAt . UTC ( ) )
return err
}
func ( r * dashboardAggregationRepository ) CleanupAggregates ( ctx context . Context , hourlyCutoff , dailyCutoff time . Time ) error {
2026-01-12 10:38:42 +08:00
hourlyCutoffUTC := hourlyCutoff . UTC ( )
dailyCutoffUTC := dailyCutoff . UTC ( )
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_hourly WHERE bucket_start < $1" , hourlyCutoffUTC ) ; err != nil {
return err
}
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_hourly_users WHERE bucket_start < $1" , hourlyCutoffUTC ) ; err != nil {
return err
}
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_daily WHERE bucket_date < $1::date" , dailyCutoffUTC ) ; err != nil {
return err
}
if _ , err := r . sql . ExecContext ( ctx , "DELETE FROM usage_dashboard_daily_users WHERE bucket_date < $1::date" , dailyCutoffUTC ) ; err != nil {
return err
}
return nil
2026-01-11 16:01:35 +08:00
}
func ( r * dashboardAggregationRepository ) CleanupUsageLogs ( ctx context . Context , cutoff time . Time ) error {
isPartitioned , err := r . isUsageLogsPartitioned ( ctx )
if err != nil {
return err
}
if isPartitioned {
return r . dropUsageLogsPartitions ( ctx , cutoff )
}
_ , err = r . sql . ExecContext ( ctx , "DELETE FROM usage_logs WHERE created_at < $1" , cutoff . UTC ( ) )
return err
}
func ( r * dashboardAggregationRepository ) EnsureUsageLogsPartitions ( ctx context . Context , now time . Time ) error {
isPartitioned , err := r . isUsageLogsPartitioned ( ctx )
if err != nil || ! isPartitioned {
return err
}
monthStart := truncateToMonthUTC ( now )
prevMonth := monthStart . AddDate ( 0 , - 1 , 0 )
nextMonth := monthStart . AddDate ( 0 , 1 , 0 )
for _ , m := range [ ] time . Time { prevMonth , monthStart , nextMonth } {
if err := r . createUsageLogsPartition ( ctx , m ) ; err != nil {
return err
}
}
return nil
}
func ( r * dashboardAggregationRepository ) insertHourlyActiveUsers ( ctx context . Context , start , end time . Time ) error {
2026-01-15 11:22:13 +08:00
tzName := timezone . Name ( )
2026-01-11 16:01:35 +08:00
query := `
INSERT INTO usage_dashboard_hourly_users ( bucket_start , user_id )
SELECT DISTINCT
2026-01-15 11:22:13 +08:00
date_trunc ( ' hour ' , created_at AT TIME ZONE $ 3 ) AT TIME ZONE $ 3 AS bucket_start ,
2026-01-11 16:01:35 +08:00
user_id
FROM usage_logs
WHERE created_at >= $ 1 AND created_at < $ 2
ON CONFLICT DO NOTHING
`
2026-01-15 11:22:13 +08:00
_ , err := r . sql . ExecContext ( ctx , query , start , end , tzName )
2026-01-11 16:01:35 +08:00
return err
}
func ( r * dashboardAggregationRepository ) insertDailyActiveUsers ( ctx context . Context , start , end time . Time ) error {
2026-01-15 11:22:13 +08:00
tzName := timezone . Name ( )
2026-01-11 16:01:35 +08:00
query := `
INSERT INTO usage_dashboard_daily_users ( bucket_date , user_id )
SELECT DISTINCT
2026-01-15 11:22:13 +08:00
( bucket_start AT TIME ZONE $ 3 ) : : date AS bucket_date ,
2026-01-11 16:01:35 +08:00
user_id
2026-01-11 17:21:17 +08:00
FROM usage_dashboard_hourly_users
WHERE bucket_start >= $ 1 AND bucket_start < $ 2
2026-01-11 16:01:35 +08:00
ON CONFLICT DO NOTHING
`
2026-01-15 11:22:13 +08:00
_ , err := r . sql . ExecContext ( ctx , query , start , end , tzName )
2026-01-11 16:01:35 +08:00
return err
}
func ( r * dashboardAggregationRepository ) upsertHourlyAggregates ( ctx context . Context , start , end time . Time ) error {
2026-01-15 11:22:13 +08:00
tzName := timezone . Name ( )
2026-01-11 16:01:35 +08:00
query := `
WITH hourly AS (
SELECT
2026-01-15 11:22:13 +08:00
date_trunc ( ' hour ' , created_at AT TIME ZONE $ 3 ) AT TIME ZONE $ 3 AS bucket_start ,
2026-01-11 16:01:35 +08:00
COUNT ( * ) AS total_requests ,
COALESCE ( SUM ( input_tokens ) , 0 ) AS input_tokens ,
COALESCE ( SUM ( output_tokens ) , 0 ) AS output_tokens ,
COALESCE ( SUM ( cache_creation_tokens ) , 0 ) AS cache_creation_tokens ,
COALESCE ( SUM ( cache_read_tokens ) , 0 ) AS cache_read_tokens ,
COALESCE ( SUM ( total_cost ) , 0 ) AS total_cost ,
COALESCE ( SUM ( actual_cost ) , 0 ) AS actual_cost ,
COALESCE ( SUM ( COALESCE ( duration_ms , 0 ) ) , 0 ) AS total_duration_ms
FROM usage_logs
WHERE created_at >= $ 1 AND created_at < $ 2
GROUP BY 1
) ,
user_counts AS (
SELECT bucket_start , COUNT ( * ) AS active_users
FROM usage_dashboard_hourly_users
WHERE bucket_start >= $ 1 AND bucket_start < $ 2
GROUP BY bucket_start
)
INSERT INTO usage_dashboard_hourly (
bucket_start ,
total_requests ,
input_tokens ,
output_tokens ,
cache_creation_tokens ,
cache_read_tokens ,
total_cost ,
actual_cost ,
total_duration_ms ,
active_users ,
computed_at
)
SELECT
hourly . bucket_start ,
hourly . total_requests ,
hourly . input_tokens ,
hourly . output_tokens ,
hourly . cache_creation_tokens ,
hourly . cache_read_tokens ,
hourly . total_cost ,
hourly . actual_cost ,
hourly . total_duration_ms ,
COALESCE ( user_counts . active_users , 0 ) AS active_users ,
NOW ( )
FROM hourly
LEFT JOIN user_counts ON user_counts . bucket_start = hourly . bucket_start
ON CONFLICT ( bucket_start )
DO UPDATE SET
total_requests = EXCLUDED . total_requests ,
input_tokens = EXCLUDED . input_tokens ,
output_tokens = EXCLUDED . output_tokens ,
cache_creation_tokens = EXCLUDED . cache_creation_tokens ,
cache_read_tokens = EXCLUDED . cache_read_tokens ,
total_cost = EXCLUDED . total_cost ,
actual_cost = EXCLUDED . actual_cost ,
total_duration_ms = EXCLUDED . total_duration_ms ,
active_users = EXCLUDED . active_users ,
computed_at = EXCLUDED . computed_at
`
2026-01-15 11:22:13 +08:00
_ , err := r . sql . ExecContext ( ctx , query , start , end , tzName )
2026-01-11 16:01:35 +08:00
return err
}
func ( r * dashboardAggregationRepository ) upsertDailyAggregates ( ctx context . Context , start , end time . Time ) error {
2026-01-15 11:22:13 +08:00
tzName := timezone . Name ( )
2026-01-11 16:01:35 +08:00
query := `
WITH daily AS (
SELECT
2026-01-15 11:22:13 +08:00
( bucket_start AT TIME ZONE $ 5 ) : : date AS bucket_date ,
2026-01-11 16:01:35 +08:00
COALESCE ( SUM ( total_requests ) , 0 ) AS total_requests ,
COALESCE ( SUM ( input_tokens ) , 0 ) AS input_tokens ,
COALESCE ( SUM ( output_tokens ) , 0 ) AS output_tokens ,
COALESCE ( SUM ( cache_creation_tokens ) , 0 ) AS cache_creation_tokens ,
COALESCE ( SUM ( cache_read_tokens ) , 0 ) AS cache_read_tokens ,
COALESCE ( SUM ( total_cost ) , 0 ) AS total_cost ,
COALESCE ( SUM ( actual_cost ) , 0 ) AS actual_cost ,
COALESCE ( SUM ( total_duration_ms ) , 0 ) AS total_duration_ms
FROM usage_dashboard_hourly
WHERE bucket_start >= $ 1 AND bucket_start < $ 2
2026-01-15 11:22:13 +08:00
GROUP BY ( bucket_start AT TIME ZONE $ 5 ) : : date
2026-01-11 16:01:35 +08:00
) ,
user_counts AS (
SELECT bucket_date , COUNT ( * ) AS active_users
FROM usage_dashboard_daily_users
WHERE bucket_date >= $ 3 : : date AND bucket_date < $ 4 : : date
GROUP BY bucket_date
)
INSERT INTO usage_dashboard_daily (
bucket_date ,
total_requests ,
input_tokens ,
output_tokens ,
cache_creation_tokens ,
cache_read_tokens ,
total_cost ,
actual_cost ,
total_duration_ms ,
active_users ,
computed_at
)
SELECT
daily . bucket_date ,
daily . total_requests ,
daily . input_tokens ,
daily . output_tokens ,
daily . cache_creation_tokens ,
daily . cache_read_tokens ,
daily . total_cost ,
daily . actual_cost ,
daily . total_duration_ms ,
COALESCE ( user_counts . active_users , 0 ) AS active_users ,
NOW ( )
FROM daily
LEFT JOIN user_counts ON user_counts . bucket_date = daily . bucket_date
ON CONFLICT ( bucket_date )
DO UPDATE SET
total_requests = EXCLUDED . total_requests ,
input_tokens = EXCLUDED . input_tokens ,
output_tokens = EXCLUDED . output_tokens ,
cache_creation_tokens = EXCLUDED . cache_creation_tokens ,
cache_read_tokens = EXCLUDED . cache_read_tokens ,
total_cost = EXCLUDED . total_cost ,
actual_cost = EXCLUDED . actual_cost ,
total_duration_ms = EXCLUDED . total_duration_ms ,
active_users = EXCLUDED . active_users ,
computed_at = EXCLUDED . computed_at
`
2026-01-15 11:22:13 +08:00
_ , err := r . sql . ExecContext ( ctx , query , start , end , start , end , tzName )
2026-01-11 16:01:35 +08:00
return err
}
func ( r * dashboardAggregationRepository ) isUsageLogsPartitioned ( ctx context . Context ) ( bool , error ) {
query := `
SELECT EXISTS (
SELECT 1
FROM pg_partitioned_table pt
JOIN pg_class c ON c . oid = pt . partrelid
WHERE c . relname = ' usage_logs '
)
`
var partitioned bool
if err := scanSingleRow ( ctx , r . sql , query , nil , & partitioned ) ; err != nil {
return false , err
}
return partitioned , nil
}
func ( r * dashboardAggregationRepository ) dropUsageLogsPartitions ( ctx context . Context , cutoff time . Time ) error {
rows , err := r . sql . QueryContext ( ctx , `
SELECT c . relname
FROM pg_inherits
JOIN pg_class c ON c . oid = pg_inherits . inhrelid
JOIN pg_class p ON p . oid = pg_inherits . inhparent
WHERE p . relname = ' usage_logs '
` )
if err != nil {
return err
}
2026-01-11 18:39:29 +08:00
defer func ( ) {
_ = rows . Close ( )
} ( )
2026-01-11 16:01:35 +08:00
cutoffMonth := truncateToMonthUTC ( cutoff )
for rows . Next ( ) {
var name string
if err := rows . Scan ( & name ) ; err != nil {
return err
}
if ! strings . HasPrefix ( name , "usage_logs_" ) {
continue
}
suffix := strings . TrimPrefix ( name , "usage_logs_" )
month , err := time . Parse ( "200601" , suffix )
if err != nil {
continue
}
month = month . UTC ( )
if month . Before ( cutoffMonth ) {
if _ , err := r . sql . ExecContext ( ctx , fmt . Sprintf ( "DROP TABLE IF EXISTS %s" , pq . QuoteIdentifier ( name ) ) ) ; err != nil {
return err
}
}
}
return rows . Err ( )
}
func ( r * dashboardAggregationRepository ) createUsageLogsPartition ( ctx context . Context , month time . Time ) error {
monthStart := truncateToMonthUTC ( month )
nextMonth := monthStart . AddDate ( 0 , 1 , 0 )
name := fmt . Sprintf ( "usage_logs_%s" , monthStart . Format ( "200601" ) )
query := fmt . Sprintf (
"CREATE TABLE IF NOT EXISTS %s PARTITION OF usage_logs FOR VALUES FROM (%s) TO (%s)" ,
pq . QuoteIdentifier ( name ) ,
pq . QuoteLiteral ( monthStart . Format ( "2006-01-02" ) ) ,
pq . QuoteLiteral ( nextMonth . Format ( "2006-01-02" ) ) ,
)
_ , err := r . sql . ExecContext ( ctx , query )
return err
}
2026-01-15 11:22:13 +08:00
func truncateToDay ( t time . Time ) time . Time {
return timezone . StartOfDay ( t )
2026-01-11 16:01:35 +08:00
}
func truncateToMonthUTC ( t time . Time ) time . Time {
t = t . UTC ( )
return time . Date ( t . Year ( ) , t . Month ( ) , 1 , 0 , 0 , 0 , 0 , time . UTC )
}