mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-05-01 11:50:44 +08:00
fix: throttle Anthropic usage queries and pass through upstream HTTP errors
- Frontend: queue Anthropic OAuth/setup-token usage requests by proxy with random 1-1.5s interval to prevent upstream 429 - Backend: return ApplicationError with actual upstream status code instead of wrapping all errors as 500 - Handle component unmount to skip stale updates on page navigation
This commit is contained in:
@@ -1 +1 @@
|
|||||||
0.1.90.3
|
0.1.90.4
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
|
||||||
"github.com/Wei-Shaw/sub2api/internal/pkg/httpclient"
|
"github.com/Wei-Shaw/sub2api/internal/pkg/httpclient"
|
||||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||||
)
|
)
|
||||||
@@ -95,7 +96,8 @@ func (s *claudeUsageService) FetchUsageWithOptions(ctx context.Context, opts *se
|
|||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
body, _ := io.ReadAll(resp.Body)
|
body, _ := io.ReadAll(resp.Body)
|
||||||
return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
|
msg := fmt.Sprintf("API returned status %d: %s", resp.StatusCode, string(body))
|
||||||
|
return nil, infraerrors.New(resp.StatusCode, "UPSTREAM_ERROR", msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
var usageResp service.ClaudeUsageResponse
|
var usageResp service.ClaudeUsageResponse
|
||||||
|
|||||||
@@ -278,11 +278,12 @@
|
|||||||
</template>
|
</template>
|
||||||
|
|
||||||
<script setup lang="ts">
|
<script setup lang="ts">
|
||||||
import { ref, computed, onMounted } from 'vue'
|
import { ref, computed, onMounted, onBeforeUnmount } from 'vue'
|
||||||
import { useI18n } from 'vue-i18n'
|
import { useI18n } from 'vue-i18n'
|
||||||
import { adminAPI } from '@/api/admin'
|
import { adminAPI } from '@/api/admin'
|
||||||
import type { Account, AccountUsageInfo, GeminiCredentials, WindowStats } from '@/types'
|
import type { Account, AccountUsageInfo, GeminiCredentials, WindowStats } from '@/types'
|
||||||
import { resolveCodexUsageWindow } from '@/utils/codexUsage'
|
import { resolveCodexUsageWindow } from '@/utils/codexUsage'
|
||||||
|
import { enqueueUsageRequest } from '@/utils/usageLoadQueue'
|
||||||
import UsageProgressBar from './UsageProgressBar.vue'
|
import UsageProgressBar from './UsageProgressBar.vue'
|
||||||
import AccountQuotaInfo from './AccountQuotaInfo.vue'
|
import AccountQuotaInfo from './AccountQuotaInfo.vue'
|
||||||
|
|
||||||
@@ -292,6 +293,9 @@ const props = defineProps<{
|
|||||||
|
|
||||||
const { t } = useI18n()
|
const { t } = useI18n()
|
||||||
|
|
||||||
|
const unmounted = ref(false)
|
||||||
|
onBeforeUnmount(() => { unmounted.value = true })
|
||||||
|
|
||||||
const loading = ref(false)
|
const loading = ref(false)
|
||||||
const error = ref<string | null>(null)
|
const error = ref<string | null>(null)
|
||||||
const usageInfo = ref<AccountUsageInfo | null>(null)
|
const usageInfo = ref<AccountUsageInfo | null>(null)
|
||||||
@@ -701,12 +705,30 @@ const loadUsage = async () => {
|
|||||||
error.value = null
|
error.value = null
|
||||||
|
|
||||||
try {
|
try {
|
||||||
usageInfo.value = await adminAPI.accounts.getUsage(props.account.id)
|
const fetchFn = () => adminAPI.accounts.getUsage(props.account.id)
|
||||||
|
let result: AccountUsageInfo
|
||||||
|
// Only throttle Anthropic OAuth/setup-token accounts to avoid upstream 429
|
||||||
|
if (
|
||||||
|
props.account.platform === 'anthropic' &&
|
||||||
|
(props.account.type === 'oauth' || props.account.type === 'setup-token')
|
||||||
|
) {
|
||||||
|
result = await enqueueUsageRequest(
|
||||||
|
props.account.platform,
|
||||||
|
'claude_code',
|
||||||
|
props.account.proxy_id,
|
||||||
|
fetchFn
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
result = await fetchFn()
|
||||||
|
}
|
||||||
|
if (!unmounted.value) usageInfo.value = result
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
error.value = t('common.error')
|
if (!unmounted.value) {
|
||||||
console.error('Failed to load usage:', e)
|
error.value = t('common.error')
|
||||||
|
console.error('Failed to load usage:', e)
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
loading.value = false
|
if (!unmounted.value) loading.value = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,10 @@ vi.mock('@/api/admin', () => ({
|
|||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
vi.mock('@/utils/usageLoadQueue', () => ({
|
||||||
|
enqueueUsageRequest: (_p: string, _t: string, _id: unknown, fn: () => Promise<unknown>) => fn()
|
||||||
|
}))
|
||||||
|
|
||||||
vi.mock('vue-i18n', async () => {
|
vi.mock('vue-i18n', async () => {
|
||||||
const actual = await vi.importActual<typeof import('vue-i18n')>('vue-i18n')
|
const actual = await vi.importActual<typeof import('vue-i18n')>('vue-i18n')
|
||||||
return {
|
return {
|
||||||
|
|||||||
87
frontend/src/utils/__tests__/usageLoadQueue.spec.ts
Normal file
87
frontend/src/utils/__tests__/usageLoadQueue.spec.ts
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
import { describe, expect, it, vi } from 'vitest'
|
||||||
|
import { enqueueUsageRequest } from '../usageLoadQueue'
|
||||||
|
|
||||||
|
function delay(ms: number) {
|
||||||
|
return new Promise((r) => setTimeout(r, ms))
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('usageLoadQueue', () => {
|
||||||
|
it('同组请求串行执行,间隔 >= 1s', async () => {
|
||||||
|
const timestamps: number[] = []
|
||||||
|
const makeFn = () => async () => {
|
||||||
|
timestamps.push(Date.now())
|
||||||
|
return 'ok'
|
||||||
|
}
|
||||||
|
|
||||||
|
const p1 = enqueueUsageRequest('anthropic', 'oauth', 1, makeFn())
|
||||||
|
const p2 = enqueueUsageRequest('anthropic', 'oauth', 1, makeFn())
|
||||||
|
const p3 = enqueueUsageRequest('anthropic', 'oauth', 1, makeFn())
|
||||||
|
|
||||||
|
await Promise.all([p1, p2, p3])
|
||||||
|
|
||||||
|
expect(timestamps).toHaveLength(3)
|
||||||
|
// 随机 1-1.5s 间隔,至少 950ms(留一点误差)
|
||||||
|
expect(timestamps[1] - timestamps[0]).toBeGreaterThanOrEqual(950)
|
||||||
|
expect(timestamps[1] - timestamps[0]).toBeLessThan(1600)
|
||||||
|
expect(timestamps[2] - timestamps[1]).toBeGreaterThanOrEqual(950)
|
||||||
|
expect(timestamps[2] - timestamps[1]).toBeLessThan(1600)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('不同组请求并行执行', async () => {
|
||||||
|
const timestamps: Record<string, number> = {}
|
||||||
|
const makeTracked = (key: string) => async () => {
|
||||||
|
timestamps[key] = Date.now()
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
const p1 = enqueueUsageRequest('anthropic', 'oauth', 1, makeTracked('group1'))
|
||||||
|
const p2 = enqueueUsageRequest('anthropic', 'oauth', 2, makeTracked('group2'))
|
||||||
|
const p3 = enqueueUsageRequest('gemini', 'oauth', 1, makeTracked('group3'))
|
||||||
|
|
||||||
|
await Promise.all([p1, p2, p3])
|
||||||
|
|
||||||
|
// 不同组应几乎同时启动(差距 < 50ms)
|
||||||
|
const values = Object.values(timestamps)
|
||||||
|
const spread = Math.max(...values) - Math.min(...values)
|
||||||
|
expect(spread).toBeLessThan(50)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('请求失败时 reject,后续任务继续执行', async () => {
|
||||||
|
const results: string[] = []
|
||||||
|
|
||||||
|
const p1 = enqueueUsageRequest('anthropic', 'oauth', 99, async () => {
|
||||||
|
throw new Error('fail')
|
||||||
|
})
|
||||||
|
const p2 = enqueueUsageRequest('anthropic', 'oauth', 99, async () => {
|
||||||
|
results.push('second')
|
||||||
|
return 'ok'
|
||||||
|
})
|
||||||
|
|
||||||
|
await expect(p1).rejects.toThrow('fail')
|
||||||
|
await p2
|
||||||
|
expect(results).toEqual(['second'])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('返回值正确透传', async () => {
|
||||||
|
const result = await enqueueUsageRequest('test', 'oauth', null, async () => {
|
||||||
|
return { usage: 42 }
|
||||||
|
})
|
||||||
|
expect(result).toEqual({ usage: 42 })
|
||||||
|
})
|
||||||
|
|
||||||
|
it('proxy_id 为 null 的账号归为同一组', async () => {
|
||||||
|
const order: number[] = []
|
||||||
|
const makeFn = (n: number) => async () => {
|
||||||
|
order.push(n)
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
const p1 = enqueueUsageRequest('anthropic', 'oauth', null, makeFn(1))
|
||||||
|
const p2 = enqueueUsageRequest('anthropic', 'oauth', null, makeFn(2))
|
||||||
|
|
||||||
|
await Promise.all([p1, p2])
|
||||||
|
|
||||||
|
// 同组串行,按入队顺序执行
|
||||||
|
expect(order).toEqual([1, 2])
|
||||||
|
})
|
||||||
|
})
|
||||||
72
frontend/src/utils/usageLoadQueue.ts
Normal file
72
frontend/src/utils/usageLoadQueue.ts
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
/**
|
||||||
|
* Usage request queue that throttles API calls by group.
|
||||||
|
*
|
||||||
|
* Accounts sharing the same upstream (platform + type + proxy) are placed
|
||||||
|
* into a single serial queue with a configurable delay between requests,
|
||||||
|
* preventing upstream 429 rate-limit errors.
|
||||||
|
*
|
||||||
|
* Different groups run in parallel since they hit different upstreams.
|
||||||
|
*/
|
||||||
|
|
||||||
|
const GROUP_DELAY_MIN_MS = 1000
|
||||||
|
const GROUP_DELAY_MAX_MS = 1500
|
||||||
|
|
||||||
|
type Task<T> = {
|
||||||
|
fn: () => Promise<T>
|
||||||
|
resolve: (value: T) => void
|
||||||
|
reject: (reason: unknown) => void
|
||||||
|
}
|
||||||
|
|
||||||
|
const queues = new Map<string, Task<unknown>[]>()
|
||||||
|
const running = new Set<string>()
|
||||||
|
|
||||||
|
function buildGroupKey(platform: string, type: string, proxyId: number | null): string {
|
||||||
|
return `${platform}:${type}:${proxyId ?? 'direct'}`
|
||||||
|
}
|
||||||
|
|
||||||
|
async function drain(groupKey: string) {
|
||||||
|
if (running.has(groupKey)) return
|
||||||
|
running.add(groupKey)
|
||||||
|
|
||||||
|
const queue = queues.get(groupKey)
|
||||||
|
while (queue && queue.length > 0) {
|
||||||
|
const task = queue.shift()!
|
||||||
|
try {
|
||||||
|
const result = await task.fn()
|
||||||
|
task.resolve(result)
|
||||||
|
} catch (err) {
|
||||||
|
task.reject(err)
|
||||||
|
}
|
||||||
|
// Wait a random 1–1.5s before next request in the same group
|
||||||
|
if (queue.length > 0) {
|
||||||
|
const jitter = GROUP_DELAY_MIN_MS + Math.random() * (GROUP_DELAY_MAX_MS - GROUP_DELAY_MIN_MS)
|
||||||
|
await new Promise((r) => setTimeout(r, jitter))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
running.delete(groupKey)
|
||||||
|
queues.delete(groupKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enqueue a usage fetch call. Returns a promise that resolves when the
|
||||||
|
* request completes (after waiting its turn in the group queue).
|
||||||
|
*/
|
||||||
|
export function enqueueUsageRequest<T>(
|
||||||
|
platform: string,
|
||||||
|
type: string,
|
||||||
|
proxyId: number | null,
|
||||||
|
fn: () => Promise<T>
|
||||||
|
): Promise<T> {
|
||||||
|
const key = buildGroupKey(platform, type, proxyId)
|
||||||
|
|
||||||
|
return new Promise<T>((resolve, reject) => {
|
||||||
|
let queue = queues.get(key)
|
||||||
|
if (!queue) {
|
||||||
|
queue = []
|
||||||
|
queues.set(key, queue)
|
||||||
|
}
|
||||||
|
queue.push({ fn, resolve, reject } as Task<unknown>)
|
||||||
|
drain(key)
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user