mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-04-29 19:04:48 +08:00
fix: handle invalid encrypted content error and retry logic.
This commit is contained in:
@@ -477,6 +477,7 @@ func classifyOpenAIWSReconnectReason(err error) (string, bool) {
|
||||
"upgrade_required",
|
||||
"ws_unsupported",
|
||||
"auth_failed",
|
||||
"invalid_encrypted_content",
|
||||
"previous_response_not_found":
|
||||
return reason, false
|
||||
}
|
||||
@@ -527,6 +528,14 @@ func resolveOpenAIWSFallbackErrorResponse(err error) (statusCode int, errType st
|
||||
}
|
||||
|
||||
switch reason {
|
||||
case "invalid_encrypted_content":
|
||||
if statusCode == 0 {
|
||||
statusCode = http.StatusBadRequest
|
||||
}
|
||||
errType = "invalid_request_error"
|
||||
if upstreamMessage == "" {
|
||||
upstreamMessage = "encrypted content could not be verified"
|
||||
}
|
||||
case "previous_response_not_found":
|
||||
if statusCode == 0 {
|
||||
statusCode = http.StatusBadRequest
|
||||
@@ -1921,6 +1930,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
var wsErr error
|
||||
wsLastFailureReason := ""
|
||||
wsPrevResponseRecoveryTried := false
|
||||
wsInvalidEncryptedContentRecoveryTried := false
|
||||
recoverPrevResponseNotFound := func(attempt int) bool {
|
||||
if wsPrevResponseRecoveryTried {
|
||||
return false
|
||||
@@ -1953,6 +1963,37 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
)
|
||||
return true
|
||||
}
|
||||
recoverInvalidEncryptedContent := func(attempt int) bool {
|
||||
if wsInvalidEncryptedContentRecoveryTried {
|
||||
return false
|
||||
}
|
||||
removedReasoningItems := trimOpenAIEncryptedReasoningItems(wsReqBody)
|
||||
if !removedReasoningItems {
|
||||
logOpenAIWSModeInfo(
|
||||
"reconnect_invalid_encrypted_content_recovery_skip account_id=%d attempt=%d reason=missing_encrypted_reasoning_items",
|
||||
account.ID,
|
||||
attempt,
|
||||
)
|
||||
return false
|
||||
}
|
||||
previousResponseID := openAIWSPayloadString(wsReqBody, "previous_response_id")
|
||||
hasFunctionCallOutput := HasFunctionCallOutput(wsReqBody)
|
||||
if previousResponseID != "" && !hasFunctionCallOutput {
|
||||
delete(wsReqBody, "previous_response_id")
|
||||
}
|
||||
wsInvalidEncryptedContentRecoveryTried = true
|
||||
logOpenAIWSModeInfo(
|
||||
"reconnect_invalid_encrypted_content_recovery account_id=%d attempt=%d action=drop_encrypted_reasoning_items retry=1 previous_response_id_present=%v previous_response_id=%s previous_response_id_kind=%s has_function_call_output=%v dropped_previous_response_id=%v",
|
||||
account.ID,
|
||||
attempt,
|
||||
previousResponseID != "",
|
||||
truncateOpenAIWSLogValue(previousResponseID, openAIWSIDValueMaxLen),
|
||||
normalizeOpenAIWSLogValue(ClassifyOpenAIPreviousResponseIDKind(previousResponseID)),
|
||||
hasFunctionCallOutput,
|
||||
previousResponseID != "" && !hasFunctionCallOutput,
|
||||
)
|
||||
return true
|
||||
}
|
||||
retryBudget := s.openAIWSRetryTotalBudget()
|
||||
retryStartedAt := time.Now()
|
||||
wsRetryLoop:
|
||||
@@ -1989,6 +2030,9 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
if reason == "previous_response_not_found" && recoverPrevResponseNotFound(attempt) {
|
||||
continue
|
||||
}
|
||||
if reason == "invalid_encrypted_content" && recoverInvalidEncryptedContent(attempt) {
|
||||
continue
|
||||
}
|
||||
if retryable && attempt < maxAttempts {
|
||||
backoff := s.openAIWSRetryBackoff(attempt)
|
||||
if retryBudget > 0 && time.Since(retryStartedAt)+backoff > retryBudget {
|
||||
@@ -2072,124 +2116,141 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
||||
return nil, wsErr
|
||||
}
|
||||
|
||||
// Build upstream request
|
||||
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, reqStream, promptCacheKey, isCodexCLI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpInvalidEncryptedContentRetryTried := false
|
||||
for {
|
||||
// Build upstream request
|
||||
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, reqStream, promptCacheKey, isCodexCLI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get proxy URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
// Get proxy URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// Send request
|
||||
upstreamStart := time.Now()
|
||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(upstreamStart).Milliseconds())
|
||||
if err != nil {
|
||||
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
setOpsUpstreamError(c, 0, safeErr, "")
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
c.JSON(http.StatusBadGateway, gin.H{
|
||||
"error": gin.H{
|
||||
"type": "upstream_error",
|
||||
"message": "Upstream request failed",
|
||||
},
|
||||
})
|
||||
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// Handle error response
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
_ = resp.Body.Close()
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
|
||||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
if s.shouldFailoverOpenAIUpstreamResponse(resp.StatusCode, upstreamMsg, respBody) {
|
||||
upstreamDetail := ""
|
||||
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
|
||||
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
|
||||
if maxBytes <= 0 {
|
||||
maxBytes = 2048
|
||||
}
|
||||
upstreamDetail = truncateString(string(respBody), maxBytes)
|
||||
}
|
||||
// Send request
|
||||
upstreamStart := time.Now()
|
||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(upstreamStart).Milliseconds())
|
||||
if err != nil {
|
||||
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
setOpsUpstreamError(c, 0, safeErr, "")
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
Detail: upstreamDetail,
|
||||
UpstreamStatusCode: 0,
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
c.JSON(http.StatusBadGateway, gin.H{
|
||||
"error": gin.H{
|
||||
"type": "upstream_error",
|
||||
"message": "Upstream request failed",
|
||||
},
|
||||
})
|
||||
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
||||
}
|
||||
|
||||
s.handleFailoverSideEffects(ctx, resp, account)
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: resp.StatusCode,
|
||||
ResponseBody: respBody,
|
||||
RetryableOnSameAccount: account.IsPoolMode() && (isPoolModeRetryableStatus(resp.StatusCode) || isOpenAITransientProcessingError(resp.StatusCode, upstreamMsg, respBody)),
|
||||
// Handle error response
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
_ = resp.Body.Close()
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
|
||||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
upstreamCode := extractUpstreamErrorCode(respBody)
|
||||
if !httpInvalidEncryptedContentRetryTried && resp.StatusCode == http.StatusBadRequest && upstreamCode == "invalid_encrypted_content" {
|
||||
if trimOpenAIEncryptedReasoningItems(reqBody) {
|
||||
body, err = json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("serialize invalid_encrypted_content retry body: %w", err)
|
||||
}
|
||||
setOpsUpstreamRequestBody(c, body)
|
||||
httpInvalidEncryptedContentRetryTried = true
|
||||
logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Retrying non-WSv2 request once after invalid_encrypted_content (account: %s)", account.Name)
|
||||
continue
|
||||
}
|
||||
logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Skip non-WSv2 invalid_encrypted_content retry because encrypted reasoning items are missing (account: %s)", account.Name)
|
||||
}
|
||||
if s.shouldFailoverOpenAIUpstreamResponse(resp.StatusCode, upstreamMsg, respBody) {
|
||||
upstreamDetail := ""
|
||||
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
|
||||
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
|
||||
if maxBytes <= 0 {
|
||||
maxBytes = 2048
|
||||
}
|
||||
upstreamDetail = truncateString(string(respBody), maxBytes)
|
||||
}
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
Detail: upstreamDetail,
|
||||
})
|
||||
|
||||
s.handleFailoverSideEffects(ctx, resp, account)
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: resp.StatusCode,
|
||||
ResponseBody: respBody,
|
||||
RetryableOnSameAccount: account.IsPoolMode() && (isPoolModeRetryableStatus(resp.StatusCode) || isOpenAITransientProcessingError(resp.StatusCode, upstreamMsg, respBody)),
|
||||
}
|
||||
}
|
||||
return s.handleErrorResponse(ctx, resp, c, account, body)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// Handle normal response
|
||||
var usage *OpenAIUsage
|
||||
var firstTokenMs *int
|
||||
if reqStream {
|
||||
streamResult, err := s.handleStreamingResponse(ctx, resp, c, account, startTime, originalModel, mappedModel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
usage = streamResult.usage
|
||||
firstTokenMs = streamResult.firstTokenMs
|
||||
} else {
|
||||
usage, err = s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, mappedModel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return s.handleErrorResponse(ctx, resp, c, account, body)
|
||||
}
|
||||
|
||||
// Handle normal response
|
||||
var usage *OpenAIUsage
|
||||
var firstTokenMs *int
|
||||
if reqStream {
|
||||
streamResult, err := s.handleStreamingResponse(ctx, resp, c, account, startTime, originalModel, mappedModel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Extract and save Codex usage snapshot from response headers (for OAuth accounts)
|
||||
if account.Type == AccountTypeOAuth {
|
||||
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
|
||||
s.updateCodexUsageSnapshot(ctx, account.ID, snapshot)
|
||||
}
|
||||
}
|
||||
usage = streamResult.usage
|
||||
firstTokenMs = streamResult.firstTokenMs
|
||||
} else {
|
||||
usage, err = s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, mappedModel)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
if usage == nil {
|
||||
usage = &OpenAIUsage{}
|
||||
}
|
||||
|
||||
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
|
||||
serviceTier := extractOpenAIServiceTier(reqBody)
|
||||
|
||||
return &OpenAIForwardResult{
|
||||
RequestID: resp.Header.Get("x-request-id"),
|
||||
Usage: *usage,
|
||||
Model: originalModel,
|
||||
ServiceTier: serviceTier,
|
||||
ReasoningEffort: reasoningEffort,
|
||||
Stream: reqStream,
|
||||
OpenAIWSMode: false,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Extract and save Codex usage snapshot from response headers (for OAuth accounts)
|
||||
if account.Type == AccountTypeOAuth {
|
||||
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
|
||||
s.updateCodexUsageSnapshot(ctx, account.ID, snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
if usage == nil {
|
||||
usage = &OpenAIUsage{}
|
||||
}
|
||||
|
||||
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
|
||||
serviceTier := extractOpenAIServiceTier(reqBody)
|
||||
|
||||
return &OpenAIForwardResult{
|
||||
RequestID: resp.Header.Get("x-request-id"),
|
||||
Usage: *usage,
|
||||
Model: originalModel,
|
||||
ServiceTier: serviceTier,
|
||||
ReasoningEffort: reasoningEffort,
|
||||
Stream: reqStream,
|
||||
OpenAIWSMode: false,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
|
||||
@@ -3740,6 +3801,109 @@ func buildOpenAIResponsesURL(base string) string {
|
||||
return normalized + "/v1/responses"
|
||||
}
|
||||
|
||||
func trimOpenAIEncryptedReasoningItems(reqBody map[string]any) bool {
|
||||
if len(reqBody) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
inputValue, has := reqBody["input"]
|
||||
if !has {
|
||||
return false
|
||||
}
|
||||
|
||||
switch input := inputValue.(type) {
|
||||
case []any:
|
||||
filtered := input[:0]
|
||||
changed := false
|
||||
for _, item := range input {
|
||||
nextItem, itemChanged, keep := sanitizeEncryptedReasoningInputItem(item)
|
||||
if itemChanged {
|
||||
changed = true
|
||||
}
|
||||
if !keep {
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, nextItem)
|
||||
}
|
||||
if !changed {
|
||||
return false
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
delete(reqBody, "input")
|
||||
return true
|
||||
}
|
||||
reqBody["input"] = filtered
|
||||
return true
|
||||
case []map[string]any:
|
||||
filtered := input[:0]
|
||||
changed := false
|
||||
for _, item := range input {
|
||||
nextItem, itemChanged, keep := sanitizeEncryptedReasoningInputItem(item)
|
||||
if itemChanged {
|
||||
changed = true
|
||||
}
|
||||
if !keep {
|
||||
continue
|
||||
}
|
||||
nextMap, ok := nextItem.(map[string]any)
|
||||
if !ok {
|
||||
filtered = append(filtered, item)
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, nextMap)
|
||||
}
|
||||
if !changed {
|
||||
return false
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
delete(reqBody, "input")
|
||||
return true
|
||||
}
|
||||
reqBody["input"] = filtered
|
||||
return true
|
||||
case map[string]any:
|
||||
nextItem, changed, keep := sanitizeEncryptedReasoningInputItem(input)
|
||||
if !changed {
|
||||
return false
|
||||
}
|
||||
if !keep {
|
||||
delete(reqBody, "input")
|
||||
return true
|
||||
}
|
||||
nextMap, ok := nextItem.(map[string]any)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
reqBody["input"] = nextMap
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func sanitizeEncryptedReasoningInputItem(item any) (next any, changed bool, keep bool) {
|
||||
inputItem, ok := item.(map[string]any)
|
||||
if !ok {
|
||||
return item, false, true
|
||||
}
|
||||
|
||||
itemType, _ := inputItem["type"].(string)
|
||||
if strings.TrimSpace(itemType) != "reasoning" {
|
||||
return item, false, true
|
||||
}
|
||||
|
||||
_, hasEncryptedContent := inputItem["encrypted_content"]
|
||||
if !hasEncryptedContent {
|
||||
return item, false, true
|
||||
}
|
||||
|
||||
delete(inputItem, "encrypted_content")
|
||||
if len(inputItem) == 1 {
|
||||
return nil, true, false
|
||||
}
|
||||
return inputItem, true, true
|
||||
}
|
||||
|
||||
func IsOpenAIResponsesCompactPathForTest(c *gin.Context) bool {
|
||||
return isOpenAIResponsesCompactPath(c)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user