mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-04-09 09:34:46 +08:00
POST /backups 和 POST /backups/:id/restore 改为异步:立即返回 HTTP 202, 后台 goroutine 独立执行 pg_dump → gzip → S3 上传,前端每 2s 轮询状态。 后端: - 新增 StartBackup/StartRestore 方法,后台 goroutine 不依赖 HTTP 连接 - Graceful shutdown 等待活跃操作完成,启动时清理孤立 running 记录 - BackupRecord 新增 progress/restore_status 字段支持进度和恢复状态追踪 前端: - 创建备份/恢复后轮询 GET /backups/:id 直到完成或失败 - 标签页切换暂停/恢复轮询,组件卸载清理定时器 - 正确处理 409(备份进行中)和轮询超时 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
118 lines
3.3 KiB
Go
118 lines
3.3 KiB
Go
package repository
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"time"
|
||
|
||
"github.com/aws/aws-sdk-go-v2/aws"
|
||
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||
|
||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||
)
|
||
|
||
// S3BackupStore implements service.BackupObjectStore using AWS S3 compatible storage
|
||
type S3BackupStore struct {
|
||
client *s3.Client
|
||
bucket string
|
||
}
|
||
|
||
// NewS3BackupStoreFactory returns a BackupObjectStoreFactory that creates S3-backed stores
|
||
func NewS3BackupStoreFactory() service.BackupObjectStoreFactory {
|
||
return func(ctx context.Context, cfg *service.BackupS3Config) (service.BackupObjectStore, error) {
|
||
region := cfg.Region
|
||
if region == "" {
|
||
region = "auto" // Cloudflare R2 默认 region
|
||
}
|
||
|
||
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
|
||
awsconfig.WithRegion(region),
|
||
awsconfig.WithCredentialsProvider(
|
||
credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
|
||
),
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("load aws config: %w", err)
|
||
}
|
||
|
||
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
||
if cfg.Endpoint != "" {
|
||
o.BaseEndpoint = &cfg.Endpoint
|
||
}
|
||
if cfg.ForcePathStyle {
|
||
o.UsePathStyle = true
|
||
}
|
||
o.APIOptions = append(o.APIOptions, v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware)
|
||
o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
|
||
})
|
||
|
||
return &S3BackupStore{client: client, bucket: cfg.Bucket}, nil
|
||
}
|
||
}
|
||
|
||
func (s *S3BackupStore) Upload(ctx context.Context, key string, body io.Reader, contentType string) (int64, error) {
|
||
// 读取全部内容以获取大小(S3 PutObject 需要知道内容长度)
|
||
// 注意:阿里云 OSS 不兼容 s3manager 分片上传的签名方式,因此使用 PutObject
|
||
data, err := io.ReadAll(body)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("read body: %w", err)
|
||
}
|
||
|
||
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
|
||
Bucket: &s.bucket,
|
||
Key: &key,
|
||
Body: bytes.NewReader(data),
|
||
ContentType: &contentType,
|
||
})
|
||
if err != nil {
|
||
return 0, fmt.Errorf("S3 PutObject: %w", err)
|
||
}
|
||
return int64(len(data)), nil
|
||
}
|
||
|
||
func (s *S3BackupStore) Download(ctx context.Context, key string) (io.ReadCloser, error) {
|
||
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
||
Bucket: &s.bucket,
|
||
Key: &key,
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("S3 GetObject: %w", err)
|
||
}
|
||
return result.Body, nil
|
||
}
|
||
|
||
func (s *S3BackupStore) Delete(ctx context.Context, key string) error {
|
||
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||
Bucket: &s.bucket,
|
||
Key: &key,
|
||
})
|
||
return err
|
||
}
|
||
|
||
func (s *S3BackupStore) PresignURL(ctx context.Context, key string, expiry time.Duration) (string, error) {
|
||
presignClient := s3.NewPresignClient(s.client)
|
||
result, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
|
||
Bucket: &s.bucket,
|
||
Key: &key,
|
||
}, s3.WithPresignExpires(expiry))
|
||
if err != nil {
|
||
return "", fmt.Errorf("presign url: %w", err)
|
||
}
|
||
return result.URL, nil
|
||
}
|
||
|
||
func (s *S3BackupStore) HeadBucket(ctx context.Context) error {
|
||
_, err := s.client.HeadBucket(ctx, &s3.HeadBucketInput{
|
||
Bucket: &s.bucket,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("S3 HeadBucket failed: %w", err)
|
||
}
|
||
return nil
|
||
}
|