diff --git a/registry/server.go b/registry/server.go index 0059cf07..6d99669a 100644 --- a/registry/server.go +++ b/registry/server.go @@ -7,19 +7,28 @@ import ( "context" "errors" "log" + "math/rand" "net/http" + "net/url" "os" "path" + "strconv" "time" "bllamo.com/build/blob" "bllamo.com/client/ollama" "bllamo.com/oweb" "bllamo.com/registry/apitype" + "bllamo.com/utils/upload" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) +// Defaults +const ( + DefaultUploadChunkSize = 50 * 1024 * 1024 +) + // TODO(bmizerany): move all env things to package envkobs? var defaultLibrary = cmp.Or(os.Getenv("OLLAMA_REGISTRY"), "registry.ollama.ai/library") @@ -28,7 +37,8 @@ func DefaultLibrary() string { } type Server struct { - minioClient *minio.Client + UploadChunkSize int64 // default is DefaultUploadChunkSize + minioClient *minio.Client } func New(mc *minio.Client) *Server { @@ -77,7 +87,6 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { } // TODO(bmizerany): parallelize - const chunkSizeTODO = 50 * 1024 * 1024 var requirements []apitype.Requirement for _, l := range m.Layers { if l.Size == 0 { @@ -91,21 +100,29 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { return err } if !pushed { - const expires = 15 * time.Minute - key := path.Join("blobs", l.Digest) - signedURL, err := s.mc().PresignedPutObject(r.Context(), "test", key, expires) - if err != nil { - return err + uploadID := generateUploadID() + for n, c := range upload.Chunks(l.Size, cmp.Or(s.UploadChunkSize, DefaultUploadChunkSize)) { + const expires = 15 * time.Minute + + key := path.Join("blobs", l.Digest) + signedURL, err := s.mc().Presign(r.Context(), "PUT", "test", key, expires, url.Values{ + "UploadId": []string{uploadID}, + "PartNumber": []string{strconv.Itoa(n)}, + "ContentLength": []string{strconv.FormatInt(c.Size, 10)}, + }) + if err != nil { + return err + } + + requirements = append(requirements, apitype.Requirement{ + Digest: l.Digest, + Offset: c.Offset, + Size: c.Size, + + // TODO(bmizerany): use signed+temp urls + URL: signedURL.String(), + }) } - - size := min(l.Size, chunkSizeTODO) - requirements = append(requirements, apitype.Requirement{ - Digest: l.Digest, - Size: size, - - // TODO(bmizerany): use signed+temp urls - URL: signedURL.String(), - }) } } @@ -158,3 +175,12 @@ func (s *Server) mc() *minio.Client { } return mc } + +func generateUploadID() string { + const hex = "0123456789abcdef" + b := make([]byte, 32) + for i := range b { + b[i] = hex[rand.Intn(len(hex))] + } + return string(b) +}