Files
sub2api/backend/internal/service/ops_upstream_context.go

201 lines
6.5 KiB
Go
Raw Permalink Normal View History

package service
import (
"encoding/json"
"strings"
"time"
"github.com/gin-gonic/gin"
)
// Gin context keys used by Ops error logger for capturing upstream error details.
// These keys are set by gateway services and consumed by handler/ops_error_logger.go.
const (
OpsUpstreamStatusCodeKey = "ops_upstream_status_code"
OpsUpstreamErrorMessageKey = "ops_upstream_error_message"
OpsUpstreamErrorDetailKey = "ops_upstream_error_detail"
OpsUpstreamErrorsKey = "ops_upstream_errors"
// Best-effort capture of the current upstream request body so ops can
// retry the specific upstream attempt (not just the client request).
// This value is sanitized+trimmed before being persisted.
OpsUpstreamRequestBodyKey = "ops_upstream_request_body"
// Optional stage latencies (milliseconds) for troubleshooting and alerting.
OpsAuthLatencyMsKey = "ops_auth_latency_ms"
OpsRoutingLatencyMsKey = "ops_routing_latency_ms"
OpsUpstreamLatencyMsKey = "ops_upstream_latency_ms"
OpsResponseLatencyMsKey = "ops_response_latency_ms"
OpsTimeToFirstTokenMsKey = "ops_time_to_first_token_ms"
// OpenAI WS 关键观测字段
OpsOpenAIWSQueueWaitMsKey = "ops_openai_ws_queue_wait_ms"
OpsOpenAIWSConnPickMsKey = "ops_openai_ws_conn_pick_ms"
OpsOpenAIWSConnReusedKey = "ops_openai_ws_conn_reused"
OpsOpenAIWSConnIDKey = "ops_openai_ws_conn_id"
2026-02-12 23:43:47 +08:00
// OpsSkipPassthroughKey 由 applyErrorPassthroughRule 在命中 skip_monitoring=true 的规则时设置。
// ops_error_logger 中间件检查此 key为 true 时跳过错误记录。
OpsSkipPassthroughKey = "ops_skip_passthrough"
)
func setOpsUpstreamRequestBody(c *gin.Context, body []byte) {
if c == nil || len(body) == 0 {
return
}
// 热路径避免 string(body) 额外分配,按需在落库前再转换。
c.Set(OpsUpstreamRequestBodyKey, body)
}
func SetOpsLatencyMs(c *gin.Context, key string, value int64) {
if c == nil || strings.TrimSpace(key) == "" || value < 0 {
return
}
c.Set(key, value)
}
func setOpsUpstreamError(c *gin.Context, upstreamStatusCode int, upstreamMessage, upstreamDetail string) {
if c == nil {
return
}
if upstreamStatusCode > 0 {
c.Set(OpsUpstreamStatusCodeKey, upstreamStatusCode)
}
if msg := strings.TrimSpace(upstreamMessage); msg != "" {
c.Set(OpsUpstreamErrorMessageKey, msg)
}
if detail := strings.TrimSpace(upstreamDetail); detail != "" {
c.Set(OpsUpstreamErrorDetailKey, detail)
}
}
// OpsUpstreamErrorEvent describes one upstream error attempt during a single gateway request.
// It is stored in ops_error_logs.upstream_errors as a JSON array.
type OpsUpstreamErrorEvent struct {
AtUnixMs int64 `json:"at_unix_ms,omitempty"`
// Passthrough 表示本次请求是否命中“原样透传(仅替换认证)”分支。
// 该字段用于排障与灰度评估;存入 JSON不涉及 DB schema 变更。
Passthrough bool `json:"passthrough,omitempty"`
// Context
Platform string `json:"platform,omitempty"`
AccountID int64 `json:"account_id,omitempty"`
AccountName string `json:"account_name,omitempty"`
// Outcome
UpstreamStatusCode int `json:"upstream_status_code,omitempty"`
UpstreamRequestID string `json:"upstream_request_id,omitempty"`
// Best-effort upstream request capture (sanitized+trimmed).
// Required for retrying a specific upstream attempt.
UpstreamRequestBody string `json:"upstream_request_body,omitempty"`
// Best-effort upstream response capture (sanitized+trimmed).
UpstreamResponseBody string `json:"upstream_response_body,omitempty"`
// Kind: http_error | request_error | retry_exhausted | failover
Kind string `json:"kind,omitempty"`
Message string `json:"message,omitempty"`
Detail string `json:"detail,omitempty"`
}
func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) {
if c == nil {
return
}
if ev.AtUnixMs <= 0 {
ev.AtUnixMs = time.Now().UnixMilli()
}
ev.Platform = strings.TrimSpace(ev.Platform)
ev.UpstreamRequestID = strings.TrimSpace(ev.UpstreamRequestID)
ev.UpstreamRequestBody = strings.TrimSpace(ev.UpstreamRequestBody)
ev.UpstreamResponseBody = strings.TrimSpace(ev.UpstreamResponseBody)
ev.Kind = strings.TrimSpace(ev.Kind)
ev.Message = strings.TrimSpace(ev.Message)
ev.Detail = strings.TrimSpace(ev.Detail)
if ev.Message != "" {
ev.Message = sanitizeUpstreamErrorMessage(ev.Message)
}
// If the caller didn't explicitly pass upstream request body but the gateway
// stored it on the context, attach it so ops can retry this specific attempt.
if ev.UpstreamRequestBody == "" {
if v, ok := c.Get(OpsUpstreamRequestBodyKey); ok {
switch raw := v.(type) {
case string:
ev.UpstreamRequestBody = strings.TrimSpace(raw)
case []byte:
ev.UpstreamRequestBody = strings.TrimSpace(string(raw))
}
}
}
var existing []*OpsUpstreamErrorEvent
if v, ok := c.Get(OpsUpstreamErrorsKey); ok {
if arr, ok := v.([]*OpsUpstreamErrorEvent); ok {
existing = arr
}
}
evCopy := ev
existing = append(existing, &evCopy)
c.Set(OpsUpstreamErrorsKey, existing)
checkSkipMonitoringForUpstreamEvent(c, &evCopy)
}
// checkSkipMonitoringForUpstreamEvent checks whether the upstream error event
// matches a passthrough rule with skip_monitoring=true and, if so, sets the
// OpsSkipPassthroughKey on the context. This ensures intermediate retry /
// failover errors (which never go through the final applyErrorPassthroughRule
// path) can still suppress ops_error_logs recording.
func checkSkipMonitoringForUpstreamEvent(c *gin.Context, ev *OpsUpstreamErrorEvent) {
if ev.UpstreamStatusCode == 0 {
return
}
svc := getBoundErrorPassthroughService(c)
if svc == nil {
return
}
// Use the best available body representation for keyword matching.
// Even when body is empty, MatchRule can still match rules that only
// specify ErrorCodes (no Keywords), so we always call it.
body := ev.Detail
if body == "" {
body = ev.Message
}
rule := svc.MatchRule(ev.Platform, ev.UpstreamStatusCode, []byte(body))
if rule != nil && rule.SkipMonitoring {
c.Set(OpsSkipPassthroughKey, true)
}
}
func marshalOpsUpstreamErrors(events []*OpsUpstreamErrorEvent) *string {
if len(events) == 0 {
return nil
}
// Ensure we always store a valid JSON value.
raw, err := json.Marshal(events)
if err != nil || len(raw) == 0 {
return nil
}
s := string(raw)
return &s
}
func ParseOpsUpstreamErrors(raw string) ([]*OpsUpstreamErrorEvent, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return []*OpsUpstreamErrorEvent{}, nil
}
var out []*OpsUpstreamErrorEvent
if err := json.Unmarshal([]byte(raw), &out); err != nil {
return nil, err
}
return out, nil
}