mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-04-27 09:54:47 +08:00
Merge pull request #999 from InCerryGit/fix/enc_coot
fix: handle invalid encrypted content error and retry logic.
This commit is contained in:
@@ -6129,6 +6129,29 @@ func extractUpstreamErrorMessage(body []byte) string {
|
|||||||
return gjson.GetBytes(body, "message").String()
|
return gjson.GetBytes(body, "message").String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func extractUpstreamErrorCode(body []byte) string {
|
||||||
|
if code := strings.TrimSpace(gjson.GetBytes(body, "error.code").String()); code != "" {
|
||||||
|
return code
|
||||||
|
}
|
||||||
|
|
||||||
|
inner := strings.TrimSpace(gjson.GetBytes(body, "error.message").String())
|
||||||
|
if !strings.HasPrefix(inner, "{") {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if code := strings.TrimSpace(gjson.Get(inner, "error.code").String()); code != "" {
|
||||||
|
return code
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastBrace := strings.LastIndex(inner, "}"); lastBrace >= 0 {
|
||||||
|
if code := strings.TrimSpace(gjson.Get(inner[:lastBrace+1], "error.code").String()); code != "" {
|
||||||
|
return code
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
func isCountTokensUnsupported404(statusCode int, body []byte) bool {
|
func isCountTokensUnsupported404(statusCode int, body []byte) bool {
|
||||||
if statusCode != http.StatusNotFound {
|
if statusCode != http.StatusNotFound {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -480,6 +480,7 @@ func classifyOpenAIWSReconnectReason(err error) (string, bool) {
|
|||||||
"upgrade_required",
|
"upgrade_required",
|
||||||
"ws_unsupported",
|
"ws_unsupported",
|
||||||
"auth_failed",
|
"auth_failed",
|
||||||
|
"invalid_encrypted_content",
|
||||||
"previous_response_not_found":
|
"previous_response_not_found":
|
||||||
return reason, false
|
return reason, false
|
||||||
}
|
}
|
||||||
@@ -530,6 +531,14 @@ func resolveOpenAIWSFallbackErrorResponse(err error) (statusCode int, errType st
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch reason {
|
switch reason {
|
||||||
|
case "invalid_encrypted_content":
|
||||||
|
if statusCode == 0 {
|
||||||
|
statusCode = http.StatusBadRequest
|
||||||
|
}
|
||||||
|
errType = "invalid_request_error"
|
||||||
|
if upstreamMessage == "" {
|
||||||
|
upstreamMessage = "encrypted content could not be verified"
|
||||||
|
}
|
||||||
case "previous_response_not_found":
|
case "previous_response_not_found":
|
||||||
if statusCode == 0 {
|
if statusCode == 0 {
|
||||||
statusCode = http.StatusBadRequest
|
statusCode = http.StatusBadRequest
|
||||||
@@ -1924,6 +1933,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
|||||||
var wsErr error
|
var wsErr error
|
||||||
wsLastFailureReason := ""
|
wsLastFailureReason := ""
|
||||||
wsPrevResponseRecoveryTried := false
|
wsPrevResponseRecoveryTried := false
|
||||||
|
wsInvalidEncryptedContentRecoveryTried := false
|
||||||
recoverPrevResponseNotFound := func(attempt int) bool {
|
recoverPrevResponseNotFound := func(attempt int) bool {
|
||||||
if wsPrevResponseRecoveryTried {
|
if wsPrevResponseRecoveryTried {
|
||||||
return false
|
return false
|
||||||
@@ -1956,6 +1966,37 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
|||||||
)
|
)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
recoverInvalidEncryptedContent := func(attempt int) bool {
|
||||||
|
if wsInvalidEncryptedContentRecoveryTried {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
removedReasoningItems := trimOpenAIEncryptedReasoningItems(wsReqBody)
|
||||||
|
if !removedReasoningItems {
|
||||||
|
logOpenAIWSModeInfo(
|
||||||
|
"reconnect_invalid_encrypted_content_recovery_skip account_id=%d attempt=%d reason=missing_encrypted_reasoning_items",
|
||||||
|
account.ID,
|
||||||
|
attempt,
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
previousResponseID := openAIWSPayloadString(wsReqBody, "previous_response_id")
|
||||||
|
hasFunctionCallOutput := HasFunctionCallOutput(wsReqBody)
|
||||||
|
if previousResponseID != "" && !hasFunctionCallOutput {
|
||||||
|
delete(wsReqBody, "previous_response_id")
|
||||||
|
}
|
||||||
|
wsInvalidEncryptedContentRecoveryTried = true
|
||||||
|
logOpenAIWSModeInfo(
|
||||||
|
"reconnect_invalid_encrypted_content_recovery account_id=%d attempt=%d action=drop_encrypted_reasoning_items retry=1 previous_response_id_present=%v previous_response_id=%s previous_response_id_kind=%s has_function_call_output=%v dropped_previous_response_id=%v",
|
||||||
|
account.ID,
|
||||||
|
attempt,
|
||||||
|
previousResponseID != "",
|
||||||
|
truncateOpenAIWSLogValue(previousResponseID, openAIWSIDValueMaxLen),
|
||||||
|
normalizeOpenAIWSLogValue(ClassifyOpenAIPreviousResponseIDKind(previousResponseID)),
|
||||||
|
hasFunctionCallOutput,
|
||||||
|
previousResponseID != "" && !hasFunctionCallOutput,
|
||||||
|
)
|
||||||
|
return true
|
||||||
|
}
|
||||||
retryBudget := s.openAIWSRetryTotalBudget()
|
retryBudget := s.openAIWSRetryTotalBudget()
|
||||||
retryStartedAt := time.Now()
|
retryStartedAt := time.Now()
|
||||||
wsRetryLoop:
|
wsRetryLoop:
|
||||||
@@ -1992,6 +2033,9 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
|||||||
if reason == "previous_response_not_found" && recoverPrevResponseNotFound(attempt) {
|
if reason == "previous_response_not_found" && recoverPrevResponseNotFound(attempt) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if reason == "invalid_encrypted_content" && recoverInvalidEncryptedContent(attempt) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if retryable && attempt < maxAttempts {
|
if retryable && attempt < maxAttempts {
|
||||||
backoff := s.openAIWSRetryBackoff(attempt)
|
backoff := s.openAIWSRetryBackoff(attempt)
|
||||||
if retryBudget > 0 && time.Since(retryStartedAt)+backoff > retryBudget {
|
if retryBudget > 0 && time.Since(retryStartedAt)+backoff > retryBudget {
|
||||||
@@ -2075,126 +2119,143 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
|
|||||||
return nil, wsErr
|
return nil, wsErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build upstream request
|
httpInvalidEncryptedContentRetryTried := false
|
||||||
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
|
for {
|
||||||
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, body, token, reqStream, promptCacheKey, isCodexCLI)
|
// Build upstream request
|
||||||
releaseUpstreamCtx()
|
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
|
||||||
if err != nil {
|
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, body, token, reqStream, promptCacheKey, isCodexCLI)
|
||||||
return nil, err
|
releaseUpstreamCtx()
|
||||||
}
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// Get proxy URL
|
// Get proxy URL
|
||||||
proxyURL := ""
|
proxyURL := ""
|
||||||
if account.ProxyID != nil && account.Proxy != nil {
|
if account.ProxyID != nil && account.Proxy != nil {
|
||||||
proxyURL = account.Proxy.URL()
|
proxyURL = account.Proxy.URL()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send request
|
// Send request
|
||||||
upstreamStart := time.Now()
|
upstreamStart := time.Now()
|
||||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||||
SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(upstreamStart).Milliseconds())
|
SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(upstreamStart).Milliseconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
|
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
|
||||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||||
setOpsUpstreamError(c, 0, safeErr, "")
|
setOpsUpstreamError(c, 0, safeErr, "")
|
||||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
||||||
Platform: account.Platform,
|
|
||||||
AccountID: account.ID,
|
|
||||||
AccountName: account.Name,
|
|
||||||
UpstreamStatusCode: 0,
|
|
||||||
Kind: "request_error",
|
|
||||||
Message: safeErr,
|
|
||||||
})
|
|
||||||
c.JSON(http.StatusBadGateway, gin.H{
|
|
||||||
"error": gin.H{
|
|
||||||
"type": "upstream_error",
|
|
||||||
"message": "Upstream request failed",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
|
||||||
}
|
|
||||||
defer func() { _ = resp.Body.Close() }()
|
|
||||||
|
|
||||||
// Handle error response
|
|
||||||
if resp.StatusCode >= 400 {
|
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
|
||||||
|
|
||||||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
|
||||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
|
||||||
if s.shouldFailoverOpenAIUpstreamResponse(resp.StatusCode, upstreamMsg, respBody) {
|
|
||||||
upstreamDetail := ""
|
|
||||||
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
|
|
||||||
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
|
|
||||||
if maxBytes <= 0 {
|
|
||||||
maxBytes = 2048
|
|
||||||
}
|
|
||||||
upstreamDetail = truncateString(string(respBody), maxBytes)
|
|
||||||
}
|
|
||||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||||
Platform: account.Platform,
|
Platform: account.Platform,
|
||||||
AccountID: account.ID,
|
AccountID: account.ID,
|
||||||
AccountName: account.Name,
|
AccountName: account.Name,
|
||||||
UpstreamStatusCode: resp.StatusCode,
|
UpstreamStatusCode: 0,
|
||||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
Kind: "request_error",
|
||||||
Kind: "failover",
|
Message: safeErr,
|
||||||
Message: upstreamMsg,
|
|
||||||
Detail: upstreamDetail,
|
|
||||||
})
|
})
|
||||||
|
c.JSON(http.StatusBadGateway, gin.H{
|
||||||
|
"error": gin.H{
|
||||||
|
"type": "upstream_error",
|
||||||
|
"message": "Upstream request failed",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
||||||
|
}
|
||||||
|
|
||||||
s.handleFailoverSideEffects(ctx, resp, account)
|
// Handle error response
|
||||||
return nil, &UpstreamFailoverError{
|
if resp.StatusCode >= 400 {
|
||||||
StatusCode: resp.StatusCode,
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||||
ResponseBody: respBody,
|
_ = resp.Body.Close()
|
||||||
RetryableOnSameAccount: account.IsPoolMode() && (isPoolModeRetryableStatus(resp.StatusCode) || isOpenAITransientProcessingError(resp.StatusCode, upstreamMsg, respBody)),
|
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||||
|
|
||||||
|
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
||||||
|
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||||
|
upstreamCode := extractUpstreamErrorCode(respBody)
|
||||||
|
if !httpInvalidEncryptedContentRetryTried && resp.StatusCode == http.StatusBadRequest && upstreamCode == "invalid_encrypted_content" {
|
||||||
|
if trimOpenAIEncryptedReasoningItems(reqBody) {
|
||||||
|
body, err = json.Marshal(reqBody)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("serialize invalid_encrypted_content retry body: %w", err)
|
||||||
|
}
|
||||||
|
setOpsUpstreamRequestBody(c, body)
|
||||||
|
httpInvalidEncryptedContentRetryTried = true
|
||||||
|
logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Retrying non-WSv2 request once after invalid_encrypted_content (account: %s)", account.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.LegacyPrintf("service.openai_gateway", "[OpenAI] Skip non-WSv2 invalid_encrypted_content retry because encrypted reasoning items are missing (account: %s)", account.Name)
|
||||||
|
}
|
||||||
|
if s.shouldFailoverOpenAIUpstreamResponse(resp.StatusCode, upstreamMsg, respBody) {
|
||||||
|
upstreamDetail := ""
|
||||||
|
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
|
||||||
|
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
|
||||||
|
if maxBytes <= 0 {
|
||||||
|
maxBytes = 2048
|
||||||
|
}
|
||||||
|
upstreamDetail = truncateString(string(respBody), maxBytes)
|
||||||
|
}
|
||||||
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||||
|
Platform: account.Platform,
|
||||||
|
AccountID: account.ID,
|
||||||
|
AccountName: account.Name,
|
||||||
|
UpstreamStatusCode: resp.StatusCode,
|
||||||
|
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||||
|
Kind: "failover",
|
||||||
|
Message: upstreamMsg,
|
||||||
|
Detail: upstreamDetail,
|
||||||
|
})
|
||||||
|
|
||||||
|
s.handleFailoverSideEffects(ctx, resp, account)
|
||||||
|
return nil, &UpstreamFailoverError{
|
||||||
|
StatusCode: resp.StatusCode,
|
||||||
|
ResponseBody: respBody,
|
||||||
|
RetryableOnSameAccount: account.IsPoolMode() && (isPoolModeRetryableStatus(resp.StatusCode) || isOpenAITransientProcessingError(resp.StatusCode, upstreamMsg, respBody)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s.handleErrorResponse(ctx, resp, c, account, body)
|
||||||
|
}
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
|
// Handle normal response
|
||||||
|
var usage *OpenAIUsage
|
||||||
|
var firstTokenMs *int
|
||||||
|
if reqStream {
|
||||||
|
streamResult, err := s.handleStreamingResponse(ctx, resp, c, account, startTime, originalModel, mappedModel)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
usage = streamResult.usage
|
||||||
|
firstTokenMs = streamResult.firstTokenMs
|
||||||
|
} else {
|
||||||
|
usage, err = s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, mappedModel)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return s.handleErrorResponse(ctx, resp, c, account, body)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle normal response
|
// Extract and save Codex usage snapshot from response headers (for OAuth accounts)
|
||||||
var usage *OpenAIUsage
|
if account.Type == AccountTypeOAuth {
|
||||||
var firstTokenMs *int
|
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
|
||||||
if reqStream {
|
s.updateCodexUsageSnapshot(ctx, account.ID, snapshot)
|
||||||
streamResult, err := s.handleStreamingResponse(ctx, resp, c, account, startTime, originalModel, mappedModel)
|
}
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
usage = streamResult.usage
|
|
||||||
firstTokenMs = streamResult.firstTokenMs
|
if usage == nil {
|
||||||
} else {
|
usage = &OpenAIUsage{}
|
||||||
usage, err = s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, mappedModel)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
|
||||||
|
serviceTier := extractOpenAIServiceTier(reqBody)
|
||||||
|
|
||||||
|
return &OpenAIForwardResult{
|
||||||
|
RequestID: resp.Header.Get("x-request-id"),
|
||||||
|
Usage: *usage,
|
||||||
|
Model: originalModel,
|
||||||
|
ServiceTier: serviceTier,
|
||||||
|
ReasoningEffort: reasoningEffort,
|
||||||
|
Stream: reqStream,
|
||||||
|
OpenAIWSMode: false,
|
||||||
|
Duration: time.Since(startTime),
|
||||||
|
FirstTokenMs: firstTokenMs,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract and save Codex usage snapshot from response headers (for OAuth accounts)
|
|
||||||
if account.Type == AccountTypeOAuth {
|
|
||||||
if snapshot := ParseCodexRateLimitHeaders(resp.Header); snapshot != nil {
|
|
||||||
s.updateCodexUsageSnapshot(ctx, account.ID, snapshot)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if usage == nil {
|
|
||||||
usage = &OpenAIUsage{}
|
|
||||||
}
|
|
||||||
|
|
||||||
reasoningEffort := extractOpenAIReasoningEffort(reqBody, originalModel)
|
|
||||||
serviceTier := extractOpenAIServiceTier(reqBody)
|
|
||||||
|
|
||||||
return &OpenAIForwardResult{
|
|
||||||
RequestID: resp.Header.Get("x-request-id"),
|
|
||||||
Usage: *usage,
|
|
||||||
Model: originalModel,
|
|
||||||
ServiceTier: serviceTier,
|
|
||||||
ReasoningEffort: reasoningEffort,
|
|
||||||
Stream: reqStream,
|
|
||||||
OpenAIWSMode: false,
|
|
||||||
Duration: time.Since(startTime),
|
|
||||||
FirstTokenMs: firstTokenMs,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
|
func (s *OpenAIGatewayService) forwardOpenAIPassthrough(
|
||||||
@@ -3756,6 +3817,109 @@ func buildOpenAIResponsesURL(base string) string {
|
|||||||
return normalized + "/v1/responses"
|
return normalized + "/v1/responses"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func trimOpenAIEncryptedReasoningItems(reqBody map[string]any) bool {
|
||||||
|
if len(reqBody) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
inputValue, has := reqBody["input"]
|
||||||
|
if !has {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
switch input := inputValue.(type) {
|
||||||
|
case []any:
|
||||||
|
filtered := input[:0]
|
||||||
|
changed := false
|
||||||
|
for _, item := range input {
|
||||||
|
nextItem, itemChanged, keep := sanitizeEncryptedReasoningInputItem(item)
|
||||||
|
if itemChanged {
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
if !keep {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
filtered = append(filtered, nextItem)
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if len(filtered) == 0 {
|
||||||
|
delete(reqBody, "input")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
reqBody["input"] = filtered
|
||||||
|
return true
|
||||||
|
case []map[string]any:
|
||||||
|
filtered := input[:0]
|
||||||
|
changed := false
|
||||||
|
for _, item := range input {
|
||||||
|
nextItem, itemChanged, keep := sanitizeEncryptedReasoningInputItem(item)
|
||||||
|
if itemChanged {
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
if !keep {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
nextMap, ok := nextItem.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
filtered = append(filtered, item)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
filtered = append(filtered, nextMap)
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if len(filtered) == 0 {
|
||||||
|
delete(reqBody, "input")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
reqBody["input"] = filtered
|
||||||
|
return true
|
||||||
|
case map[string]any:
|
||||||
|
nextItem, changed, keep := sanitizeEncryptedReasoningInputItem(input)
|
||||||
|
if !changed {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !keep {
|
||||||
|
delete(reqBody, "input")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
nextMap, ok := nextItem.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
reqBody["input"] = nextMap
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sanitizeEncryptedReasoningInputItem(item any) (next any, changed bool, keep bool) {
|
||||||
|
inputItem, ok := item.(map[string]any)
|
||||||
|
if !ok {
|
||||||
|
return item, false, true
|
||||||
|
}
|
||||||
|
|
||||||
|
itemType, _ := inputItem["type"].(string)
|
||||||
|
if strings.TrimSpace(itemType) != "reasoning" {
|
||||||
|
return item, false, true
|
||||||
|
}
|
||||||
|
|
||||||
|
_, hasEncryptedContent := inputItem["encrypted_content"]
|
||||||
|
if !hasEncryptedContent {
|
||||||
|
return item, false, true
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(inputItem, "encrypted_content")
|
||||||
|
if len(inputItem) == 1 {
|
||||||
|
return nil, true, false
|
||||||
|
}
|
||||||
|
return inputItem, true, true
|
||||||
|
}
|
||||||
|
|
||||||
func IsOpenAIResponsesCompactPathForTest(c *gin.Context) bool {
|
func IsOpenAIResponsesCompactPathForTest(c *gin.Context) bool {
|
||||||
return isOpenAIResponsesCompactPath(c)
|
return isOpenAIResponsesCompactPath(c)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3922,6 +3922,8 @@ func classifyOpenAIWSErrorEventFromRaw(codeRaw, errTypeRaw, msgRaw string) (stri
|
|||||||
return "ws_unsupported", true
|
return "ws_unsupported", true
|
||||||
case "websocket_connection_limit_reached":
|
case "websocket_connection_limit_reached":
|
||||||
return "ws_connection_limit_reached", true
|
return "ws_connection_limit_reached", true
|
||||||
|
case "invalid_encrypted_content":
|
||||||
|
return "invalid_encrypted_content", true
|
||||||
case "previous_response_not_found":
|
case "previous_response_not_found":
|
||||||
return "previous_response_not_found", true
|
return "previous_response_not_found", true
|
||||||
}
|
}
|
||||||
@@ -3940,6 +3942,10 @@ func classifyOpenAIWSErrorEventFromRaw(codeRaw, errTypeRaw, msgRaw string) (stri
|
|||||||
if strings.Contains(msg, "connection limit") && strings.Contains(msg, "websocket") {
|
if strings.Contains(msg, "connection limit") && strings.Contains(msg, "websocket") {
|
||||||
return "ws_connection_limit_reached", true
|
return "ws_connection_limit_reached", true
|
||||||
}
|
}
|
||||||
|
if strings.Contains(msg, "invalid_encrypted_content") ||
|
||||||
|
(strings.Contains(msg, "encrypted content") && strings.Contains(msg, "could not be verified")) {
|
||||||
|
return "invalid_encrypted_content", true
|
||||||
|
}
|
||||||
if strings.Contains(msg, "previous_response_not_found") ||
|
if strings.Contains(msg, "previous_response_not_found") ||
|
||||||
(strings.Contains(msg, "previous response") && strings.Contains(msg, "not found")) {
|
(strings.Contains(msg, "previous response") && strings.Contains(msg, "not found")) {
|
||||||
return "previous_response_not_found", true
|
return "previous_response_not_found", true
|
||||||
@@ -3964,6 +3970,7 @@ func openAIWSErrorHTTPStatusFromRaw(codeRaw, errTypeRaw string) int {
|
|||||||
case strings.Contains(errType, "invalid_request"),
|
case strings.Contains(errType, "invalid_request"),
|
||||||
strings.Contains(code, "invalid_request"),
|
strings.Contains(code, "invalid_request"),
|
||||||
strings.Contains(code, "bad_request"),
|
strings.Contains(code, "bad_request"),
|
||||||
|
code == "invalid_encrypted_content",
|
||||||
code == "previous_response_not_found":
|
code == "previous_response_not_found":
|
||||||
return http.StatusBadRequest
|
return http.StatusBadRequest
|
||||||
case strings.Contains(errType, "authentication"),
|
case strings.Contains(errType, "authentication"),
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
@@ -19,6 +20,47 @@ import (
|
|||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type httpUpstreamSequenceRecorder struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
bodies [][]byte
|
||||||
|
reqs []*http.Request
|
||||||
|
|
||||||
|
responses []*http.Response
|
||||||
|
errs []error
|
||||||
|
callCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *httpUpstreamSequenceRecorder) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
|
||||||
|
u.mu.Lock()
|
||||||
|
defer u.mu.Unlock()
|
||||||
|
|
||||||
|
idx := u.callCount
|
||||||
|
u.callCount++
|
||||||
|
u.reqs = append(u.reqs, req)
|
||||||
|
if req != nil && req.Body != nil {
|
||||||
|
b, _ := io.ReadAll(req.Body)
|
||||||
|
u.bodies = append(u.bodies, b)
|
||||||
|
_ = req.Body.Close()
|
||||||
|
req.Body = io.NopCloser(bytes.NewReader(b))
|
||||||
|
} else {
|
||||||
|
u.bodies = append(u.bodies, nil)
|
||||||
|
}
|
||||||
|
if idx < len(u.errs) && u.errs[idx] != nil {
|
||||||
|
return nil, u.errs[idx]
|
||||||
|
}
|
||||||
|
if idx < len(u.responses) {
|
||||||
|
return u.responses[idx], nil
|
||||||
|
}
|
||||||
|
if len(u.responses) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return u.responses[len(u.responses)-1], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *httpUpstreamSequenceRecorder) DoWithTLS(req *http.Request, proxyURL string, accountID int64, accountConcurrency int, enableTLSFingerprint bool) (*http.Response, error) {
|
||||||
|
return u.Do(req, proxyURL, accountID, accountConcurrency)
|
||||||
|
}
|
||||||
|
|
||||||
func TestOpenAIGatewayService_Forward_PreservePreviousResponseIDWhenWSEnabled(t *testing.T) {
|
func TestOpenAIGatewayService_Forward_PreservePreviousResponseIDWhenWSEnabled(t *testing.T) {
|
||||||
gin.SetMode(gin.TestMode)
|
gin.SetMode(gin.TestMode)
|
||||||
wsFallbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
wsFallbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -143,6 +185,176 @@ func TestOpenAIGatewayService_Forward_HTTPIngressStaysHTTPWhenWSEnabled(t *testi
|
|||||||
require.Equal(t, "client_protocol_http", reason)
|
require.Equal(t, "client_protocol_http", reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOpenAIGatewayService_Forward_HTTPIngressRetriesInvalidEncryptedContentOnce(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
wsFallbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
}))
|
||||||
|
defer wsFallbackServer.Close()
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/openai/v1/responses", nil)
|
||||||
|
c.Request.Header.Set("User-Agent", "custom-client/1.0")
|
||||||
|
SetOpenAIClientTransport(c, OpenAIClientTransportHTTP)
|
||||||
|
|
||||||
|
upstream := &httpUpstreamSequenceRecorder{
|
||||||
|
responses: []*http.Response{
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusBadRequest,
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(
|
||||||
|
`{"error":{"code":"invalid_encrypted_content","type":"invalid_request_error","message":"The encrypted content could not be verified."}}`,
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(
|
||||||
|
`{"id":"resp_http_retry_ok","usage":{"input_tokens":1,"output_tokens":2,"input_tokens_details":{"cached_tokens":0}}}`,
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &config.Config{}
|
||||||
|
cfg.Security.URLAllowlist.Enabled = false
|
||||||
|
cfg.Security.URLAllowlist.AllowInsecureHTTP = true
|
||||||
|
cfg.Gateway.OpenAIWS.Enabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.OAuthEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.APIKeyEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.ResponsesWebsocketsV2 = true
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: cfg,
|
||||||
|
httpUpstream: upstream,
|
||||||
|
openaiWSResolver: NewOpenAIWSProtocolResolver(cfg),
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &Account{
|
||||||
|
ID: 102,
|
||||||
|
Name: "openai-apikey",
|
||||||
|
Platform: PlatformOpenAI,
|
||||||
|
Type: AccountTypeAPIKey,
|
||||||
|
Concurrency: 1,
|
||||||
|
Credentials: map[string]any{
|
||||||
|
"api_key": "sk-test",
|
||||||
|
"base_url": wsFallbackServer.URL,
|
||||||
|
},
|
||||||
|
Extra: map[string]any{
|
||||||
|
"responses_websockets_v2_enabled": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
body := []byte(`{"model":"gpt-5.1","stream":false,"previous_response_id":"resp_http_retry","input":[{"type":"reasoning","encrypted_content":"gAAA","summary":[{"type":"summary_text","text":"keep me"}]},{"type":"input_text","text":"hello"}]}`)
|
||||||
|
result, err := svc.Forward(context.Background(), c, account, body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.False(t, result.OpenAIWSMode, "HTTP 入站应保持 HTTP 转发")
|
||||||
|
require.Equal(t, 2, upstream.callCount, "命中 invalid_encrypted_content 后应只在 HTTP 路径重试一次")
|
||||||
|
require.Len(t, upstream.bodies, 2)
|
||||||
|
|
||||||
|
firstBody := upstream.bodies[0]
|
||||||
|
secondBody := upstream.bodies[1]
|
||||||
|
require.False(t, gjson.GetBytes(firstBody, "previous_response_id").Exists(), "HTTP 首次请求仍应沿用原逻辑移除 previous_response_id")
|
||||||
|
require.True(t, gjson.GetBytes(firstBody, "input.0.encrypted_content").Exists(), "首次请求不应做发送前预清理")
|
||||||
|
require.Equal(t, "keep me", gjson.GetBytes(firstBody, "input.0.summary.0.text").String())
|
||||||
|
|
||||||
|
require.False(t, gjson.GetBytes(secondBody, "previous_response_id").Exists(), "HTTP 精确重试不应重新带回 previous_response_id")
|
||||||
|
require.False(t, gjson.GetBytes(secondBody, "input.0.encrypted_content").Exists(), "精确重试应移除 reasoning.encrypted_content")
|
||||||
|
require.Equal(t, "keep me", gjson.GetBytes(secondBody, "input.0.summary.0.text").String(), "精确重试应保留有效 reasoning summary")
|
||||||
|
require.Equal(t, "input_text", gjson.GetBytes(secondBody, "input.1.type").String(), "非 reasoning input 应保持原样")
|
||||||
|
|
||||||
|
decision, _ := c.Get("openai_ws_transport_decision")
|
||||||
|
reason, _ := c.Get("openai_ws_transport_reason")
|
||||||
|
require.Equal(t, string(OpenAIUpstreamTransportHTTPSSE), decision)
|
||||||
|
require.Equal(t, "client_protocol_http", reason)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenAIGatewayService_Forward_HTTPIngressRetriesWrappedInvalidEncryptedContentOnce(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
wsFallbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
}))
|
||||||
|
defer wsFallbackServer.Close()
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/openai/v1/responses", nil)
|
||||||
|
c.Request.Header.Set("User-Agent", "custom-client/1.0")
|
||||||
|
SetOpenAIClientTransport(c, OpenAIClientTransportHTTP)
|
||||||
|
|
||||||
|
upstream := &httpUpstreamSequenceRecorder{
|
||||||
|
responses: []*http.Response{
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusBadRequest,
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(
|
||||||
|
`{"error":{"code":null,"message":"{\"error\":{\"message\":\"The encrypted content could not be verified.\",\"type\":\"invalid_request_error\",\"param\":null,\"code\":\"invalid_encrypted_content\"}}(traceid: fb7ad1dbc7699c18f8a02f258f1af5ab)","param":null,"type":"invalid_request_error"}}`,
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{
|
||||||
|
"Content-Type": []string{"application/json"},
|
||||||
|
"x-request-id": []string{"req_http_retry_wrapped_ok"},
|
||||||
|
},
|
||||||
|
Body: io.NopCloser(strings.NewReader(
|
||||||
|
`{"id":"resp_http_retry_wrapped_ok","usage":{"input_tokens":1,"output_tokens":2,"input_tokens_details":{"cached_tokens":0}}}`,
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &config.Config{}
|
||||||
|
cfg.Security.URLAllowlist.Enabled = false
|
||||||
|
cfg.Security.URLAllowlist.AllowInsecureHTTP = true
|
||||||
|
cfg.Gateway.OpenAIWS.Enabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.OAuthEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.APIKeyEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.ResponsesWebsocketsV2 = true
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: cfg,
|
||||||
|
httpUpstream: upstream,
|
||||||
|
openaiWSResolver: NewOpenAIWSProtocolResolver(cfg),
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &Account{
|
||||||
|
ID: 103,
|
||||||
|
Name: "openai-apikey-wrapped",
|
||||||
|
Platform: PlatformOpenAI,
|
||||||
|
Type: AccountTypeAPIKey,
|
||||||
|
Concurrency: 1,
|
||||||
|
Credentials: map[string]any{
|
||||||
|
"api_key": "sk-test",
|
||||||
|
"base_url": wsFallbackServer.URL,
|
||||||
|
},
|
||||||
|
Extra: map[string]any{
|
||||||
|
"responses_websockets_v2_enabled": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
body := []byte(`{"model":"gpt-5.1","stream":false,"previous_response_id":"resp_http_retry_wrapped","input":[{"type":"reasoning","encrypted_content":"gAAA","summary":[{"type":"summary_text","text":"keep me too"}]},{"type":"input_text","text":"hello"}]}`)
|
||||||
|
result, err := svc.Forward(context.Background(), c, account, body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.False(t, result.OpenAIWSMode, "HTTP 入站应保持 HTTP 转发")
|
||||||
|
require.Equal(t, 2, upstream.callCount, "wrapped invalid_encrypted_content 也应只在 HTTP 路径重试一次")
|
||||||
|
require.Len(t, upstream.bodies, 2)
|
||||||
|
|
||||||
|
firstBody := upstream.bodies[0]
|
||||||
|
secondBody := upstream.bodies[1]
|
||||||
|
require.True(t, gjson.GetBytes(firstBody, "input.0.encrypted_content").Exists(), "首次请求不应做发送前预清理")
|
||||||
|
require.False(t, gjson.GetBytes(secondBody, "input.0.encrypted_content").Exists(), "wrapped exact retry 应移除 reasoning.encrypted_content")
|
||||||
|
require.Equal(t, "keep me too", gjson.GetBytes(secondBody, "input.0.summary.0.text").String(), "wrapped exact retry 应保留有效 reasoning summary")
|
||||||
|
|
||||||
|
decision, _ := c.Get("openai_ws_transport_decision")
|
||||||
|
reason, _ := c.Get("openai_ws_transport_reason")
|
||||||
|
require.Equal(t, string(OpenAIUpstreamTransportHTTPSSE), decision)
|
||||||
|
require.Equal(t, "client_protocol_http", reason)
|
||||||
|
}
|
||||||
|
|
||||||
func TestOpenAIGatewayService_Forward_RemovePreviousResponseIDWhenWSDisabled(t *testing.T) {
|
func TestOpenAIGatewayService_Forward_RemovePreviousResponseIDWhenWSDisabled(t *testing.T) {
|
||||||
gin.SetMode(gin.TestMode)
|
gin.SetMode(gin.TestMode)
|
||||||
wsFallbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
wsFallbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -1218,3 +1430,460 @@ func TestOpenAIGatewayService_Forward_WSv2PreviousResponseNotFoundOnlyRecoversOn
|
|||||||
require.True(t, gjson.GetBytes(requests[0], "previous_response_id").Exists(), "首轮请求应包含 previous_response_id")
|
require.True(t, gjson.GetBytes(requests[0], "previous_response_id").Exists(), "首轮请求应包含 previous_response_id")
|
||||||
require.False(t, gjson.GetBytes(requests[1], "previous_response_id").Exists(), "恢复重试应移除 previous_response_id")
|
require.False(t, gjson.GetBytes(requests[1], "previous_response_id").Exists(), "恢复重试应移除 previous_response_id")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOpenAIGatewayService_Forward_WSv2InvalidEncryptedContentRecoversOnce(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
var wsAttempts atomic.Int32
|
||||||
|
var wsRequestPayloads [][]byte
|
||||||
|
var wsRequestMu sync.Mutex
|
||||||
|
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
|
||||||
|
wsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
attempt := wsAttempts.Add(1)
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("upgrade websocket failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
t.Errorf("read ws request failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reqRaw, _ := json.Marshal(req)
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
wsRequestPayloads = append(wsRequestPayloads, reqRaw)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
if attempt == 1 {
|
||||||
|
_ = conn.WriteJSON(map[string]any{
|
||||||
|
"type": "error",
|
||||||
|
"error": map[string]any{
|
||||||
|
"code": "invalid_encrypted_content",
|
||||||
|
"type": "invalid_request_error",
|
||||||
|
"message": "The encrypted content could not be verified.",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = conn.WriteJSON(map[string]any{
|
||||||
|
"type": "response.completed",
|
||||||
|
"response": map[string]any{
|
||||||
|
"id": "resp_ws_invalid_encrypted_content_recover_ok",
|
||||||
|
"model": "gpt-5.3-codex",
|
||||||
|
"usage": map[string]any{
|
||||||
|
"input_tokens": 1,
|
||||||
|
"output_tokens": 1,
|
||||||
|
"input_tokens_details": map[string]any{
|
||||||
|
"cached_tokens": 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer wsServer.Close()
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/openai/v1/responses", nil)
|
||||||
|
c.Request.Header.Set("User-Agent", "custom-client/1.0")
|
||||||
|
|
||||||
|
upstream := &httpUpstreamRecorder{
|
||||||
|
resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(`{"id":"resp_http_drop_reasoning","usage":{"input_tokens":1,"output_tokens":1}}`)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &config.Config{}
|
||||||
|
cfg.Security.URLAllowlist.Enabled = false
|
||||||
|
cfg.Security.URLAllowlist.AllowInsecureHTTP = true
|
||||||
|
cfg.Gateway.OpenAIWS.Enabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.OAuthEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.APIKeyEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.ResponsesWebsocketsV2 = true
|
||||||
|
cfg.Gateway.OpenAIWS.FallbackCooldownSeconds = 1
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: cfg,
|
||||||
|
httpUpstream: upstream,
|
||||||
|
openaiWSResolver: NewOpenAIWSProtocolResolver(cfg),
|
||||||
|
toolCorrector: NewCodexToolCorrector(),
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &Account{
|
||||||
|
ID: 95,
|
||||||
|
Name: "openai-apikey",
|
||||||
|
Platform: PlatformOpenAI,
|
||||||
|
Type: AccountTypeAPIKey,
|
||||||
|
Concurrency: 1,
|
||||||
|
Credentials: map[string]any{
|
||||||
|
"api_key": "sk-test",
|
||||||
|
"base_url": wsServer.URL,
|
||||||
|
},
|
||||||
|
Extra: map[string]any{
|
||||||
|
"responses_websockets_v2_enabled": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
body := []byte(`{"model":"gpt-5.3-codex","stream":false,"previous_response_id":"resp_prev_encrypted","input":[{"type":"reasoning","encrypted_content":"gAAA"},{"type":"input_text","text":"hello"}]}`)
|
||||||
|
result, err := svc.Forward(context.Background(), c, account, body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.Equal(t, "resp_ws_invalid_encrypted_content_recover_ok", result.RequestID)
|
||||||
|
require.Nil(t, upstream.lastReq, "invalid_encrypted_content 不应回退 HTTP")
|
||||||
|
require.Equal(t, int32(2), wsAttempts.Load(), "invalid_encrypted_content 应触发一次清洗后重试")
|
||||||
|
require.Equal(t, http.StatusOK, rec.Code)
|
||||||
|
require.Equal(t, "resp_ws_invalid_encrypted_content_recover_ok", gjson.Get(rec.Body.String(), "id").String())
|
||||||
|
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
requests := append([][]byte(nil), wsRequestPayloads...)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
require.Len(t, requests, 2)
|
||||||
|
require.True(t, gjson.GetBytes(requests[0], "previous_response_id").Exists(), "首轮请求应保留 previous_response_id")
|
||||||
|
require.True(t, gjson.GetBytes(requests[0], `input.0.encrypted_content`).Exists(), "首轮请求应保留 encrypted reasoning")
|
||||||
|
require.False(t, gjson.GetBytes(requests[1], "previous_response_id").Exists(), "恢复重试应移除 previous_response_id")
|
||||||
|
require.False(t, gjson.GetBytes(requests[1], `input.0.encrypted_content`).Exists(), "恢复重试应移除 encrypted reasoning item")
|
||||||
|
require.Equal(t, "input_text", gjson.GetBytes(requests[1], `input.0.type`).String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenAIGatewayService_Forward_WSv2InvalidEncryptedContentSkipsRecoveryWithoutReasoningItem(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
var wsAttempts atomic.Int32
|
||||||
|
var wsRequestPayloads [][]byte
|
||||||
|
var wsRequestMu sync.Mutex
|
||||||
|
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
|
||||||
|
wsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
wsAttempts.Add(1)
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("upgrade websocket failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
t.Errorf("read ws request failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reqRaw, _ := json.Marshal(req)
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
wsRequestPayloads = append(wsRequestPayloads, reqRaw)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
_ = conn.WriteJSON(map[string]any{
|
||||||
|
"type": "error",
|
||||||
|
"error": map[string]any{
|
||||||
|
"code": "invalid_encrypted_content",
|
||||||
|
"type": "invalid_request_error",
|
||||||
|
"message": "The encrypted content could not be verified.",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer wsServer.Close()
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/openai/v1/responses", nil)
|
||||||
|
c.Request.Header.Set("User-Agent", "custom-client/1.0")
|
||||||
|
|
||||||
|
upstream := &httpUpstreamRecorder{
|
||||||
|
resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(`{"id":"resp_http_drop_reasoning","usage":{"input_tokens":1,"output_tokens":1}}`)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &config.Config{}
|
||||||
|
cfg.Security.URLAllowlist.Enabled = false
|
||||||
|
cfg.Security.URLAllowlist.AllowInsecureHTTP = true
|
||||||
|
cfg.Gateway.OpenAIWS.Enabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.OAuthEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.APIKeyEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.ResponsesWebsocketsV2 = true
|
||||||
|
cfg.Gateway.OpenAIWS.FallbackCooldownSeconds = 1
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: cfg,
|
||||||
|
httpUpstream: upstream,
|
||||||
|
openaiWSResolver: NewOpenAIWSProtocolResolver(cfg),
|
||||||
|
toolCorrector: NewCodexToolCorrector(),
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &Account{
|
||||||
|
ID: 96,
|
||||||
|
Name: "openai-apikey",
|
||||||
|
Platform: PlatformOpenAI,
|
||||||
|
Type: AccountTypeAPIKey,
|
||||||
|
Concurrency: 1,
|
||||||
|
Credentials: map[string]any{
|
||||||
|
"api_key": "sk-test",
|
||||||
|
"base_url": wsServer.URL,
|
||||||
|
},
|
||||||
|
Extra: map[string]any{
|
||||||
|
"responses_websockets_v2_enabled": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
body := []byte(`{"model":"gpt-5.3-codex","stream":false,"previous_response_id":"resp_prev_encrypted","input":[{"type":"input_text","text":"hello"}]}`)
|
||||||
|
result, err := svc.Forward(context.Background(), c, account, body)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Nil(t, result)
|
||||||
|
require.Nil(t, upstream.lastReq, "invalid_encrypted_content 不应回退 HTTP")
|
||||||
|
require.Equal(t, int32(1), wsAttempts.Load(), "缺少 reasoning encrypted item 时应跳过自动恢复重试")
|
||||||
|
require.Equal(t, http.StatusBadRequest, rec.Code)
|
||||||
|
require.Contains(t, strings.ToLower(rec.Body.String()), "encrypted content")
|
||||||
|
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
requests := append([][]byte(nil), wsRequestPayloads...)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
require.Len(t, requests, 1)
|
||||||
|
require.True(t, gjson.GetBytes(requests[0], "previous_response_id").Exists())
|
||||||
|
require.False(t, gjson.GetBytes(requests[0], `input.0.encrypted_content`).Exists())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenAIGatewayService_Forward_WSv2InvalidEncryptedContentRecoversSingleObjectInputAndKeepsSummary(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
var wsAttempts atomic.Int32
|
||||||
|
var wsRequestPayloads [][]byte
|
||||||
|
var wsRequestMu sync.Mutex
|
||||||
|
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
|
||||||
|
wsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
attempt := wsAttempts.Add(1)
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("upgrade websocket failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
t.Errorf("read ws request failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reqRaw, _ := json.Marshal(req)
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
wsRequestPayloads = append(wsRequestPayloads, reqRaw)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
if attempt == 1 {
|
||||||
|
_ = conn.WriteJSON(map[string]any{
|
||||||
|
"type": "error",
|
||||||
|
"error": map[string]any{
|
||||||
|
"code": "invalid_encrypted_content",
|
||||||
|
"type": "invalid_request_error",
|
||||||
|
"message": "The encrypted content could not be verified.",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = conn.WriteJSON(map[string]any{
|
||||||
|
"type": "response.completed",
|
||||||
|
"response": map[string]any{
|
||||||
|
"id": "resp_ws_invalid_encrypted_content_object_ok",
|
||||||
|
"model": "gpt-5.3-codex",
|
||||||
|
"usage": map[string]any{
|
||||||
|
"input_tokens": 1,
|
||||||
|
"output_tokens": 1,
|
||||||
|
"input_tokens_details": map[string]any{
|
||||||
|
"cached_tokens": 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer wsServer.Close()
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/openai/v1/responses", nil)
|
||||||
|
c.Request.Header.Set("User-Agent", "custom-client/1.0")
|
||||||
|
|
||||||
|
upstream := &httpUpstreamRecorder{
|
||||||
|
resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(`{"id":"resp_http_drop_reasoning","usage":{"input_tokens":1,"output_tokens":1}}`)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &config.Config{}
|
||||||
|
cfg.Security.URLAllowlist.Enabled = false
|
||||||
|
cfg.Security.URLAllowlist.AllowInsecureHTTP = true
|
||||||
|
cfg.Gateway.OpenAIWS.Enabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.OAuthEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.APIKeyEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.ResponsesWebsocketsV2 = true
|
||||||
|
cfg.Gateway.OpenAIWS.FallbackCooldownSeconds = 1
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: cfg,
|
||||||
|
httpUpstream: upstream,
|
||||||
|
openaiWSResolver: NewOpenAIWSProtocolResolver(cfg),
|
||||||
|
toolCorrector: NewCodexToolCorrector(),
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &Account{
|
||||||
|
ID: 97,
|
||||||
|
Name: "openai-apikey",
|
||||||
|
Platform: PlatformOpenAI,
|
||||||
|
Type: AccountTypeAPIKey,
|
||||||
|
Concurrency: 1,
|
||||||
|
Credentials: map[string]any{
|
||||||
|
"api_key": "sk-test",
|
||||||
|
"base_url": wsServer.URL,
|
||||||
|
},
|
||||||
|
Extra: map[string]any{
|
||||||
|
"responses_websockets_v2_enabled": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
body := []byte(`{"model":"gpt-5.3-codex","stream":false,"previous_response_id":"resp_prev_encrypted","input":{"type":"reasoning","encrypted_content":"gAAA","summary":[{"type":"summary_text","text":"keep me"}]}}`)
|
||||||
|
result, err := svc.Forward(context.Background(), c, account, body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.Equal(t, "resp_ws_invalid_encrypted_content_object_ok", result.RequestID)
|
||||||
|
require.Nil(t, upstream.lastReq, "invalid_encrypted_content 单对象 input 不应回退 HTTP")
|
||||||
|
require.Equal(t, int32(2), wsAttempts.Load(), "单对象 reasoning input 也应触发一次清洗后重试")
|
||||||
|
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
requests := append([][]byte(nil), wsRequestPayloads...)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
require.Len(t, requests, 2)
|
||||||
|
require.True(t, gjson.GetBytes(requests[0], `input.encrypted_content`).Exists(), "首轮单对象应保留 encrypted_content")
|
||||||
|
require.True(t, gjson.GetBytes(requests[1], `input.summary.0.text`).Exists(), "恢复重试应保留 reasoning summary")
|
||||||
|
require.False(t, gjson.GetBytes(requests[1], `input.encrypted_content`).Exists(), "恢复重试只应移除 encrypted_content")
|
||||||
|
require.Equal(t, "reasoning", gjson.GetBytes(requests[1], `input.type`).String())
|
||||||
|
require.False(t, gjson.GetBytes(requests[1], `previous_response_id`).Exists(), "恢复重试应移除 previous_response_id")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenAIGatewayService_Forward_WSv2InvalidEncryptedContentKeepsPreviousResponseIDForFunctionCallOutput(t *testing.T) {
|
||||||
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
|
var wsAttempts atomic.Int32
|
||||||
|
var wsRequestPayloads [][]byte
|
||||||
|
var wsRequestMu sync.Mutex
|
||||||
|
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
|
||||||
|
wsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
attempt := wsAttempts.Add(1)
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("upgrade websocket failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var req map[string]any
|
||||||
|
if err := conn.ReadJSON(&req); err != nil {
|
||||||
|
t.Errorf("read ws request failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reqRaw, _ := json.Marshal(req)
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
wsRequestPayloads = append(wsRequestPayloads, reqRaw)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
if attempt == 1 {
|
||||||
|
_ = conn.WriteJSON(map[string]any{
|
||||||
|
"type": "error",
|
||||||
|
"error": map[string]any{
|
||||||
|
"code": "invalid_encrypted_content",
|
||||||
|
"type": "invalid_request_error",
|
||||||
|
"message": "The encrypted content could not be verified.",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = conn.WriteJSON(map[string]any{
|
||||||
|
"type": "response.completed",
|
||||||
|
"response": map[string]any{
|
||||||
|
"id": "resp_ws_invalid_encrypted_content_function_call_output_ok",
|
||||||
|
"model": "gpt-5.3-codex",
|
||||||
|
"usage": map[string]any{
|
||||||
|
"input_tokens": 1,
|
||||||
|
"output_tokens": 1,
|
||||||
|
"input_tokens_details": map[string]any{
|
||||||
|
"cached_tokens": 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer wsServer.Close()
|
||||||
|
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
c, _ := gin.CreateTestContext(rec)
|
||||||
|
c.Request = httptest.NewRequest(http.MethodPost, "/openai/v1/responses", nil)
|
||||||
|
c.Request.Header.Set("User-Agent", "custom-client/1.0")
|
||||||
|
|
||||||
|
upstream := &httpUpstreamRecorder{
|
||||||
|
resp: &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
Body: io.NopCloser(strings.NewReader(`{"id":"resp_http_drop_reasoning","usage":{"input_tokens":1,"output_tokens":1}}`)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &config.Config{}
|
||||||
|
cfg.Security.URLAllowlist.Enabled = false
|
||||||
|
cfg.Security.URLAllowlist.AllowInsecureHTTP = true
|
||||||
|
cfg.Gateway.OpenAIWS.Enabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.OAuthEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.APIKeyEnabled = true
|
||||||
|
cfg.Gateway.OpenAIWS.ResponsesWebsocketsV2 = true
|
||||||
|
cfg.Gateway.OpenAIWS.FallbackCooldownSeconds = 1
|
||||||
|
|
||||||
|
svc := &OpenAIGatewayService{
|
||||||
|
cfg: cfg,
|
||||||
|
httpUpstream: upstream,
|
||||||
|
openaiWSResolver: NewOpenAIWSProtocolResolver(cfg),
|
||||||
|
toolCorrector: NewCodexToolCorrector(),
|
||||||
|
}
|
||||||
|
|
||||||
|
account := &Account{
|
||||||
|
ID: 98,
|
||||||
|
Name: "openai-apikey",
|
||||||
|
Platform: PlatformOpenAI,
|
||||||
|
Type: AccountTypeAPIKey,
|
||||||
|
Concurrency: 1,
|
||||||
|
Credentials: map[string]any{
|
||||||
|
"api_key": "sk-test",
|
||||||
|
"base_url": wsServer.URL,
|
||||||
|
},
|
||||||
|
Extra: map[string]any{
|
||||||
|
"responses_websockets_v2_enabled": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
body := []byte(`{"model":"gpt-5.3-codex","stream":false,"previous_response_id":"resp_prev_function_call","input":[{"type":"reasoning","encrypted_content":"gAAA"},{"type":"function_call_output","call_id":"call_123","output":"ok"}]}`)
|
||||||
|
result, err := svc.Forward(context.Background(), c, account, body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, result)
|
||||||
|
require.Equal(t, "resp_ws_invalid_encrypted_content_function_call_output_ok", result.RequestID)
|
||||||
|
require.Nil(t, upstream.lastReq, "function_call_output + invalid_encrypted_content 不应回退 HTTP")
|
||||||
|
require.Equal(t, int32(2), wsAttempts.Load(), "应只做一次保锚点的清洗后重试")
|
||||||
|
|
||||||
|
wsRequestMu.Lock()
|
||||||
|
requests := append([][]byte(nil), wsRequestPayloads...)
|
||||||
|
wsRequestMu.Unlock()
|
||||||
|
require.Len(t, requests, 2)
|
||||||
|
require.True(t, gjson.GetBytes(requests[0], "previous_response_id").Exists(), "首轮请求应保留 previous_response_id")
|
||||||
|
require.True(t, gjson.GetBytes(requests[1], "previous_response_id").Exists(), "function_call_output 恢复重试不应移除 previous_response_id")
|
||||||
|
require.False(t, gjson.GetBytes(requests[1], `input.0.encrypted_content`).Exists(), "恢复重试应移除 reasoning encrypted_content")
|
||||||
|
require.Equal(t, "function_call_output", gjson.GetBytes(requests[1], `input.0.type`).String(), "清洗后应保留 function_call_output 作为首个输入项")
|
||||||
|
require.Equal(t, "call_123", gjson.GetBytes(requests[1], `input.0.call_id`).String())
|
||||||
|
require.Equal(t, "ok", gjson.GetBytes(requests[1], `input.0.output`).String())
|
||||||
|
require.Equal(t, "resp_prev_function_call", gjson.GetBytes(requests[1], "previous_response_id").String())
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user