Logo

Cloudflare R2 存储服务的命令行工具

2025年5月17日 · 1257

1.工具讲解

此工具使用Golang编写

本程序是一个用于将本地文件上传到 Cloudflare R2 存储服务的命令行工具。它支持多文件并发上传、进度显示、自动重试机制,并提供详细的日志输出和错误处理。 编译后示例运行界面: 请输入你的 Cloudflare Account ID: xxxxxxxx 请输入你的 Access Key ID: xxxxxxxx 请输入你的 Access Key Secret: ********* 请输入你要上传到的 Bucket 名称: my-bucket 请输入要上传的文件路径(多个用逗号分隔): /path/to/file1.jpg, /path/to/file2.mp4 请输入最大并发数(默认 8):

🚀 开始上传 2 个文件(最大并发数: 8) [video.mp4] 进度: 60% [===================== ] 15.6 MB>26.2 MB ✅ 上传成功: https://xxxxxx.r2.cloudflarestorage.com/my-bucket/video.mp4 ... 🎉 所有文件上传完成!

2. 适用场景

个人或企业批量上传图片、视频、备份文件至 Cloudflare R2 作为自动化脚本的一部分,集成到 CI/CD 流程中 替代传统 FTP/SFTP 方式,更安全高效地上传静态资源

4.代码

package main

import (
    "bufio"
    "context"
    "fmt"
    "log"
    "os"
    "path/filepath"
    "runtime"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/credentials"
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
    "mime"
)

// 修改 promptInput 函数以处理错误
func promptInput(reader *bufio.Reader, prompt string) string {
    fmt.Print(prompt)
    input, err := reader.ReadString('\n')
    if err != nil {
        log.Fatalf("读取输入失败: %v", err)
    }
    return strings.TrimSpace(input)
}

func fileExists(path string) bool {
    _, err := os.Stat(path)
    return err == nil
}

func validateConnection(client *s3.Client, bucketName string) error {
    _, err := client.HeadBucket(context.TODO(), &s3.HeadBucketInput{
        Bucket: aws.String(bucketName),
    })
    if err != nil {
        return fmt.Errorf("无法访问 Bucket: %v", err)
    }
    return nil
}

func detectContentType(filePath string) string {
    contentType := "application/octet-stream"
    if ext := filepath.Ext(filePath); ext != "" {
        if detected := mime.TypeByExtension(ext); detected != "" {
            contentType = detected
        }
    }
    return contentType
}

func formatBytes(bytes int64) string {
    units := []string{"B", "KB", "MB", "GB", "TB"}
    var i int
    value := float64(bytes)
    for i = 0; i < len(units)-1; i++ {
        if value < 1024 {
            break
        }
        value /= 1024
    }
    return fmt.Sprintf("%.1f %s", value, units[i])
}

// 添加一个自定义的进度读取器 - MOVED UP BEFORE uploadFile
type progressReader struct {
    reader       *os.File
    size         int64
    read         int64
    progressLock *sync.Mutex
    lastPercent  *int
    key          string
}

func (r *progressReader) Read(p []byte) (int, error) {
    n, err := r.reader.Read(p)
    if n > 0 {
        r.read += int64(n)
        r.progressLock.Lock()
        defer r.progressLock.Unlock()
        
        percent := int((float64(r.read)/float64(r.size))*100)
        if percent > *r.lastPercent {
            *r.lastPercent = percent
            fmt.Printf("\r[%s] 进度: %3d%% [%s>%s] %s/%s",
                r.key, percent,
                strings.Repeat("=", percent/2),
                strings.Repeat(" ", 50-percent/2),
                formatBytes(r.read),
                formatBytes(r.size))
        }
    }
    return n, err
}

// 在 uploadFile 函数中修改变量名,避免与类型名冲突
func uploadFile(client *s3.Client, accountID, bucketName, filePath string, retries int) (string, error) {
    file, err := os.Open(filePath)
    if err != nil {
        return "", fmt.Errorf("无法打开文件: %w", err)
    }
    defer file.Close()

    fileInfo, _ := file.Stat()
    totalSize := fileInfo.Size()
    key := filepath.Base(filePath)
    contentType := detectContentType(filePath)

    uploader := manager.NewUploader(client, func(u *manager.Uploader) {
        u.PartSize = 5 * 1024 * 1024 // 5MB分片
        u.Concurrency = 2            // 单文件并发数
        u.BufferProvider = manager.NewBufferedReadSeekerWriteToPool(5 * 1024 * 1024)
    })

    var progressLock sync.Mutex
    var lastPercent int

    // 在 uploadFile 函数中添加文件指针重置
    for i := 1; i <= retries; i++ {
        log.Printf("尝试第 %d 次上传: %s", i, filePath)
        
        // 重置文件指针到开始位置
        if _, err := file.Seek(0, 0); err != nil {
            return "", fmt.Errorf("重置文件指针失败: %w", err)
        }
        
        input := &s3.PutObjectInput{
            Bucket:      aws.String(bucketName),
            Key:         aws.String(key),
            Body:        file,
            ContentType: aws.String(contentType),
        }

        // 修改:使用不同的变量名,避免与类型名冲突
        if totalSize > 0 {
            // 创建一个进度读取器实例
            progReader := &progressReader{
                reader:       file,
                size:         totalSize,
                progressLock: &progressLock,
                lastPercent:  &lastPercent,
                key:          key,
            }
            input.Body = progReader
        }

        _, err = uploader.Upload(context.TODO(), input)

        if err == nil {
            url := fmt.Sprintf("https://%s.r2.cloudflarestorage.com/%s/%s", accountID, bucketName, key)
            fmt.Printf("\n✅ 上传成功: %s\n", url)
            return url, nil
        }
        
        log.Printf("⚠️ 第 %d 次上传失败: %v", i, err)
        if i < retries {
            time.Sleep(2 * time.Second)
        }
    }
    return "", fmt.Errorf("❌ 文件上传失败: %s", filePath)
}

// 改进 uploadFilesConcurrently 函数,添加 context 支持
func uploadFilesConcurrently(ctx context.Context, client *s3.Client, accountID, bucketName string, filePaths []string, maxConcurrency int) {
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, maxConcurrency)
    
    for _, path := range filePaths {
        wg.Add(1)
        go func(filePath string) {
            defer wg.Done()
            semaphore <- struct{}{}
            defer func() { <-semaphore }()
            
            select {
            case <-ctx.Done():
                log.Printf("上传取消: %s", filePath)
                return
            default:
                if _, err := uploadFile(client, accountID, bucketName, filePath, 3); err != nil {
                    log.Printf("❌ 上传失败: %v", err)
                }
            }
        }(path)
    }
    wg.Wait()
}

func main() {
    reader := bufio.NewReader(os.Stdin)

    accountID := promptInput(reader, "请输入你的 Cloudflare Account ID: ")
    accessKeyID := promptInput(reader, "请输入你的 Access Key ID: ")
    accessKeySecret := promptInput(reader, "请输入你的 Access Key Secret: ")
    bucketName := promptInput(reader, "请输入你要上传到的 Bucket 名称: ")
    filePathsInput := promptInput(reader, "请输入要上传的文件路径(多个用逗号分隔): ")

    filePaths := make([]string, 0)
    for _, path := range strings.Split(filePathsInput, ",") {
        cleanedPath := strings.TrimSpace(path)
        if cleanedPath != "" {
            if !fileExists(cleanedPath) {
                log.Fatalf("❌ 文件不存在: %s", cleanedPath)
            }
            filePaths = append(filePaths, cleanedPath)
        }
    }

    maxConcurrency := runtime.NumCPU()
    concurrencyInput := promptInput(reader, fmt.Sprintf("请输入最大并发数(默认 %d): ", maxConcurrency))
    if concurrencyInput != "" {
        if num, err := strconv.Atoi(concurrencyInput); err == nil && num > 0 {
            maxConcurrency = num
        } else {
            log.Printf("⚠️ 无效的并发数,使用默认值 %d", maxConcurrency)
        }
    }

    r2Resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
        return aws.Endpoint{
            URL: fmt.Sprintf("https://%s.r2.cloudflarestorage.com", accountID),
        }, nil
    })

    cfg, err := config.LoadDefaultConfig(context.TODO(),
        config.WithEndpointResolverWithOptions(r2Resolver),
        config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, accessKeySecret, "")),
        config.WithRegion("auto"),
    )
    if err != nil {
        log.Fatalf("❌ 无法加载配置: %v", err)
    }

    client := s3.NewFromConfig(cfg)

    if err := validateConnection(client, bucketName); err != nil {
        log.Fatalf("❌ 连接验证失败: %v", err)
    }

    log.Printf("🚀 开始上传 %d 个文件(最大并发数: %d)", len(filePaths), maxConcurrency)
    // 修改:添加 context 参数
    ctx := context.Background()
    uploadFilesConcurrently(ctx, client, accountID, bucketName, filePaths, maxConcurrency)
    log.Println("\n🎉 所有文件上传完成!")
}