Cloudflare R2 存储服务的命令行工具
2025年5月17日 · 1257 字
1.工具讲解
此工具使用Golang编写
本程序是一个用于将本地文件上传到 Cloudflare R2 存储服务的命令行工具。它支持多文件并发上传、进度显示、自动重试机制,并提供详细的日志输出和错误处理。 编译后示例运行界面: 请输入你的 Cloudflare Account ID: xxxxxxxx 请输入你的 Access Key ID: xxxxxxxx 请输入你的 Access Key Secret: ********* 请输入你要上传到的 Bucket 名称: my-bucket 请输入要上传的文件路径(多个用逗号分隔): /path/to/file1.jpg, /path/to/file2.mp4 请输入最大并发数(默认 8):
🚀 开始上传 2 个文件(最大并发数: 8) [video.mp4] 进度: 60% [===================== ] 15.6 MB>26.2 MB ✅ 上传成功: https://xxxxxx.r2.cloudflarestorage.com/my-bucket/video.mp4 ... 🎉 所有文件上传完成!
2. 适用场景
个人或企业批量上传图片、视频、备份文件至 Cloudflare R2 作为自动化脚本的一部分,集成到 CI/CD 流程中 替代传统 FTP/SFTP 方式,更安全高效地上传静态资源
4.代码
package main
import (
"bufio"
"context"
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"mime"
)
// 修改 promptInput 函数以处理错误
func promptInput(reader *bufio.Reader, prompt string) string {
fmt.Print(prompt)
input, err := reader.ReadString('\n')
if err != nil {
log.Fatalf("读取输入失败: %v", err)
}
return strings.TrimSpace(input)
}
func fileExists(path string) bool {
_, err := os.Stat(path)
return err == nil
}
func validateConnection(client *s3.Client, bucketName string) error {
_, err := client.HeadBucket(context.TODO(), &s3.HeadBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
return fmt.Errorf("无法访问 Bucket: %v", err)
}
return nil
}
func detectContentType(filePath string) string {
contentType := "application/octet-stream"
if ext := filepath.Ext(filePath); ext != "" {
if detected := mime.TypeByExtension(ext); detected != "" {
contentType = detected
}
}
return contentType
}
func formatBytes(bytes int64) string {
units := []string{"B", "KB", "MB", "GB", "TB"}
var i int
value := float64(bytes)
for i = 0; i < len(units)-1; i++ {
if value < 1024 {
break
}
value /= 1024
}
return fmt.Sprintf("%.1f %s", value, units[i])
}
// 添加一个自定义的进度读取器 - MOVED UP BEFORE uploadFile
type progressReader struct {
reader *os.File
size int64
read int64
progressLock *sync.Mutex
lastPercent *int
key string
}
func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
if n > 0 {
r.read += int64(n)
r.progressLock.Lock()
defer r.progressLock.Unlock()
percent := int((float64(r.read)/float64(r.size))*100)
if percent > *r.lastPercent {
*r.lastPercent = percent
fmt.Printf("\r[%s] 进度: %3d%% [%s>%s] %s/%s",
r.key, percent,
strings.Repeat("=", percent/2),
strings.Repeat(" ", 50-percent/2),
formatBytes(r.read),
formatBytes(r.size))
}
}
return n, err
}
// 在 uploadFile 函数中修改变量名,避免与类型名冲突
func uploadFile(client *s3.Client, accountID, bucketName, filePath string, retries int) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("无法打开文件: %w", err)
}
defer file.Close()
fileInfo, _ := file.Stat()
totalSize := fileInfo.Size()
key := filepath.Base(filePath)
contentType := detectContentType(filePath)
uploader := manager.NewUploader(client, func(u *manager.Uploader) {
u.PartSize = 5 * 1024 * 1024 // 5MB分片
u.Concurrency = 2 // 单文件并发数
u.BufferProvider = manager.NewBufferedReadSeekerWriteToPool(5 * 1024 * 1024)
})
var progressLock sync.Mutex
var lastPercent int
// 在 uploadFile 函数中添加文件指针重置
for i := 1; i <= retries; i++ {
log.Printf("尝试第 %d 次上传: %s", i, filePath)
// 重置文件指针到开始位置
if _, err := file.Seek(0, 0); err != nil {
return "", fmt.Errorf("重置文件指针失败: %w", err)
}
input := &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: file,
ContentType: aws.String(contentType),
}
// 修改:使用不同的变量名,避免与类型名冲突
if totalSize > 0 {
// 创建一个进度读取器实例
progReader := &progressReader{
reader: file,
size: totalSize,
progressLock: &progressLock,
lastPercent: &lastPercent,
key: key,
}
input.Body = progReader
}
_, err = uploader.Upload(context.TODO(), input)
if err == nil {
url := fmt.Sprintf("https://%s.r2.cloudflarestorage.com/%s/%s", accountID, bucketName, key)
fmt.Printf("\n✅ 上传成功: %s\n", url)
return url, nil
}
log.Printf("⚠️ 第 %d 次上传失败: %v", i, err)
if i < retries {
time.Sleep(2 * time.Second)
}
}
return "", fmt.Errorf("❌ 文件上传失败: %s", filePath)
}
// 改进 uploadFilesConcurrently 函数,添加 context 支持
func uploadFilesConcurrently(ctx context.Context, client *s3.Client, accountID, bucketName string, filePaths []string, maxConcurrency int) {
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxConcurrency)
for _, path := range filePaths {
wg.Add(1)
go func(filePath string) {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
select {
case <-ctx.Done():
log.Printf("上传取消: %s", filePath)
return
default:
if _, err := uploadFile(client, accountID, bucketName, filePath, 3); err != nil {
log.Printf("❌ 上传失败: %v", err)
}
}
}(path)
}
wg.Wait()
}
func main() {
reader := bufio.NewReader(os.Stdin)
accountID := promptInput(reader, "请输入你的 Cloudflare Account ID: ")
accessKeyID := promptInput(reader, "请输入你的 Access Key ID: ")
accessKeySecret := promptInput(reader, "请输入你的 Access Key Secret: ")
bucketName := promptInput(reader, "请输入你要上传到的 Bucket 名称: ")
filePathsInput := promptInput(reader, "请输入要上传的文件路径(多个用逗号分隔): ")
filePaths := make([]string, 0)
for _, path := range strings.Split(filePathsInput, ",") {
cleanedPath := strings.TrimSpace(path)
if cleanedPath != "" {
if !fileExists(cleanedPath) {
log.Fatalf("❌ 文件不存在: %s", cleanedPath)
}
filePaths = append(filePaths, cleanedPath)
}
}
maxConcurrency := runtime.NumCPU()
concurrencyInput := promptInput(reader, fmt.Sprintf("请输入最大并发数(默认 %d): ", maxConcurrency))
if concurrencyInput != "" {
if num, err := strconv.Atoi(concurrencyInput); err == nil && num > 0 {
maxConcurrency = num
} else {
log.Printf("⚠️ 无效的并发数,使用默认值 %d", maxConcurrency)
}
}
r2Resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: fmt.Sprintf("https://%s.r2.cloudflarestorage.com", accountID),
}, nil
})
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithEndpointResolverWithOptions(r2Resolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, accessKeySecret, "")),
config.WithRegion("auto"),
)
if err != nil {
log.Fatalf("❌ 无法加载配置: %v", err)
}
client := s3.NewFromConfig(cfg)
if err := validateConnection(client, bucketName); err != nil {
log.Fatalf("❌ 连接验证失败: %v", err)
}
log.Printf("🚀 开始上传 %d 个文件(最大并发数: %d)", len(filePaths), maxConcurrency)
// 修改:添加 context 参数
ctx := context.Background()
uploadFilesConcurrently(ctx, client, accountID, bucketName, filePaths, maxConcurrency)
log.Println("\n🎉 所有文件上传完成!")
}