tune concurrency manager

- higher initial concurrency
- lower cooldown after ramping up
- lower threshold for ramp up
This commit is contained in:
Michael Yang 2024-03-07 14:18:25 -08:00
parent 2ada81e068
commit daf928fe1a

View File

@ -12,6 +12,7 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -151,7 +152,7 @@ func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *regis
_ = file.Truncate(b.Total) _ = file.Truncate(b.Total)
var limit int64 = 2 limit := int64(runtime.NumCPU())
g, inner := NewLimitGroup(ctx, numDownloadParts, limit) g, inner := NewLimitGroup(ctx, numDownloadParts, limit)
go watchDelta(inner, g, &b.Completed, limit) go watchDelta(inner, g, &b.Completed, limit)
@ -424,8 +425,8 @@ func watchDelta(ctx context.Context, g *LimitGroup, c *atomic.Int64, limit int64
var maxDelta float64 var maxDelta float64
var buckets []int64 var buckets []int64
// 5s ramp up period // 3s ramp up period
nextUpdate := time.Now().Add(5 * time.Second) nextUpdate := time.Now().Add(3 * time.Second)
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
for { for {
@ -446,7 +447,7 @@ func watchDelta(ctx context.Context, g *LimitGroup, c *atomic.Int64, limit int64
continue continue
} else if maxDelta > 0 { } else if maxDelta > 0 {
x := delta / maxDelta x := delta / maxDelta
if x < 1.2 { if x < 1 {
continue continue
} }
@ -456,7 +457,7 @@ func watchDelta(ctx context.Context, g *LimitGroup, c *atomic.Int64, limit int64
} }
// 3s cooldown period // 3s cooldown period
nextUpdate = time.Now().Add(3 * time.Second) nextUpdate = time.Now().Add(2 * time.Second)
maxDelta = delta maxDelta = delta
case <-ctx.Done(): case <-ctx.Done():