diff --git a/x/api/api.go b/x/api/api.go index 2d7800e4..57ad9fd9 100644 --- a/x/api/api.go +++ b/x/api/api.go @@ -99,7 +99,7 @@ func (s *Server) handlePush(_ http.ResponseWriter, r *http.Request) error { // commit the manifest to the registry requirements, err = c.Push(r.Context(), params.Name, man, ®istry.PushParams{ - Uploaded: uploads, + CompleteParts: uploads, }) if err != nil { return err diff --git a/x/registry/apitype/apitype.go b/x/registry/apitype/apitype.go index de6f6152..19602868 100644 --- a/x/registry/apitype/apitype.go +++ b/x/registry/apitype/apitype.go @@ -23,7 +23,7 @@ type PushRequest struct { // Parts is a list of upload parts that the client upload in the previous // push. - Uploaded []CompletePart `json:"part_uploads"` + CompleteParts []CompletePart `json:"part_uploads"` } type Requirement struct { diff --git a/x/registry/client.go b/x/registry/client.go index 54d90bb5..2d77efe4 100644 --- a/x/registry/client.go +++ b/x/registry/client.go @@ -24,7 +24,7 @@ func (c *Client) oclient() *ollama.Client { } type PushParams struct { - Uploaded []apitype.CompletePart + CompleteParts []apitype.CompletePart } // Push pushes a manifest to the server. @@ -32,9 +32,9 @@ func (c *Client) Push(ctx context.Context, ref string, manifest []byte, p *PushP p = cmp.Or(p, &PushParams{}) // TODO(bmizerany): backoff v, err := ollama.Do[apitype.PushResponse](ctx, c.oclient(), "POST", "/v1/push", &apitype.PushRequest{ - Ref: ref, - Manifest: manifest, - Uploaded: p.Uploaded, + Ref: ref, + Manifest: manifest, + CompleteParts: p.CompleteParts, }) if err != nil { return nil, err diff --git a/x/registry/server.go b/x/registry/server.go index 884778ef..6c5d3b57 100644 --- a/x/registry/server.go +++ b/x/registry/server.go @@ -101,9 +101,9 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { } completePartsByUploadID := make(map[string]completeParts) - for _, pu := range pr.Uploaded { + for _, mcp := range pr.CompleteParts { // parse the URL - u, err := url.Parse(pu.URL) + u, err := url.Parse(mcp.URL) if err != nil { return err } @@ -117,8 +117,7 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { if err != nil { return oweb.Mistake("invalid", "url", "invalid or missing PartNumber") } - etag := pu.ETag - if etag == "" { + if mcp.ETag == "" { return oweb.Mistake("invalid", "etag", "missing") } cp, ok := completePartsByUploadID[uploadID] @@ -128,7 +127,7 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { } cp.parts = append(cp.parts, minio.CompletePart{ PartNumber: partNumber, - ETag: etag, + ETag: mcp.ETag, }) completePartsByUploadID[uploadID] = cp } diff --git a/x/registry/server_test.go b/x/registry/server_test.go index 3dbae41f..faf50445 100644 --- a/x/registry/server_test.go +++ b/x/registry/server_test.go @@ -28,15 +28,22 @@ import ( "kr.dev/diff" ) -func testPush(t *testing.T, chunkSize int64) { - t.Run(fmt.Sprintf("chunkSize=%d", chunkSize), func(t *testing.T) { - mc := startMinio(t, true) +func TestPushBasic(t *testing.T) { + const MB = 1024 * 1024 - const MB = 1024 * 1024 + mc := startMinio(t, true) - // Upload two small layers and one large layer that will - // trigger a multipart upload. - manifest := []byte(`{ + defer func() { + mcc := &minio.Core{Client: mc} + // fail if there are any incomplete uploads + for x := range mcc.ListIncompleteUploads(context.Background(), "test", "theKey", true) { + t.Errorf("incomplete: %v", x) + } + }() + + // Upload two small layers and one large layer that will + // trigger a multipart upload. + manifest := []byte(`{ "layers": [ {"digest": "sha256-1", "size": 1}, {"digest": "sha256-2", "size": 2}, @@ -44,106 +51,100 @@ func testPush(t *testing.T, chunkSize int64) { ] }`) - const ref = "registry.ollama.ai/x/y:latest+Z" + const ref = "registry.ollama.ai/x/y:latest+Z" - hs := httptest.NewServer(&Server{ - minioClient: mc, - UploadChunkSize: 5 * MB, - }) - t.Cleanup(hs.Close) - c := &Client{BaseURL: hs.URL} + hs := httptest.NewServer(&Server{ + minioClient: mc, + UploadChunkSize: 5 * MB, + }) + t.Cleanup(hs.Close) + c := &Client{BaseURL: hs.URL} - requirements, err := c.Push(context.Background(), ref, manifest, nil) + requirements, err := c.Push(context.Background(), ref, manifest, nil) + if err != nil { + t.Fatal(err) + } + + if len(requirements) < 3 { + t.Errorf("expected at least 3 requirements; got %d", len(requirements)) + t.Logf("requirements: %v", requirements) + } + + var uploaded []apitype.CompletePart + for i, r := range requirements { + t.Logf("[%d] pushing layer: offset=%d size=%d", i, r.Offset, r.Size) + + cp, err := PushLayer(context.Background(), &abcReader{}, r.URL, r.Offset, r.Size) if err != nil { t.Fatal(err) } + uploaded = append(uploaded, cp) + } - if len(requirements) < 3 { - t.Fatalf("expected at least 3 requirements; got %d", len(requirements)) - t.Logf("requirements: %v", requirements) - } + requirements, err = c.Push(context.Background(), ref, manifest, &PushParams{ + CompleteParts: uploaded, + }) + if err != nil { + t.Fatal(err) + } + if len(requirements) != 0 { + t.Errorf("unexpected requirements: %v", requirements) + } - var uploaded []apitype.CompletePart - for i, r := range requirements { - t.Logf("[%d] pushing layer: offset=%d size=%d", i, r.Offset, r.Size) + var paths []string + keys := mc.ListObjects(context.Background(), "test", minio.ListObjectsOptions{ + Recursive: true, + }) + for k := range keys { + paths = append(paths, k.Key) + } - cp, err := PushLayer(context.Background(), &abcReader{}, r.URL, r.Offset, r.Size) - if err != nil { - t.Fatal(err) - } - uploaded = append(uploaded, cp) - } + t.Logf("paths: %v", paths) - requirements, err = c.Push(context.Background(), ref, manifest, &PushParams{ - Uploaded: uploaded, - }) - if err != nil { - t.Fatal(err) - } - if len(requirements) != 0 { - t.Fatalf("unexpected requirements: %v", requirements) - } + diff.Test(t, t.Errorf, paths, []string{ + "blobs/sha256-1", + "blobs/sha256-2", + "blobs/sha256-3", + "manifests/registry.ollama.ai/x/y/latest/Z", + }) - var paths []string - keys := mc.ListObjects(context.Background(), "test", minio.ListObjectsOptions{ - Recursive: true, - }) - for k := range keys { - paths = append(paths, k.Key) - } + obj, err := mc.GetObject(context.Background(), "test", "manifests/registry.ollama.ai/x/y/latest/Z", minio.GetObjectOptions{}) + if err != nil { + t.Fatal(err) + } + defer obj.Close() - t.Logf("paths: %v", paths) + var gotM apitype.Manifest + if err := json.NewDecoder(obj).Decode(&gotM); err != nil { + t.Fatal(err) + } - diff.Test(t, t.Errorf, paths, []string{ - "blobs/sha256-1", - "blobs/sha256-2", - "blobs/sha256-3", - "manifests/registry.ollama.ai/x/y/latest/Z", - }) + diff.Test(t, t.Errorf, gotM, apitype.Manifest{ + Layers: []apitype.Layer{ + {Digest: "sha256-1", Size: 1}, + {Digest: "sha256-2", Size: 2}, + {Digest: "sha256-3", Size: 3}, + }, + }) - obj, err := mc.GetObject(context.Background(), "test", "manifests/registry.ollama.ai/x/y/latest/Z", minio.GetObjectOptions{}) + // checksum the blobs + for i, l := range gotM.Layers { + obj, err := mc.GetObject(context.Background(), "test", "blobs/"+l.Digest, minio.GetObjectOptions{}) if err != nil { t.Fatal(err) } defer obj.Close() - var gotM apitype.Manifest - if err := json.NewDecoder(obj).Decode(&gotM); err != nil { + info, err := obj.Stat() + if err != nil { t.Fatal(err) } + t.Logf("[%d] layer info: name=%q l.Size=%d size=%d", i, info.Key, l.Size, info.Size) - diff.Test(t, t.Errorf, gotM, apitype.Manifest{ - Layers: []apitype.Layer{ - {Digest: "sha256-1", Size: 1}, - {Digest: "sha256-2", Size: 2}, - {Digest: "sha256-3", Size: 3}, - }, - }) - - // checksum the blobs - for i, l := range gotM.Layers { - obj, err := mc.GetObject(context.Background(), "test", "blobs/"+l.Digest, minio.GetObjectOptions{}) - if err != nil { - t.Fatal(err) - } - defer obj.Close() - - info, err := obj.Stat() - if err != nil { - t.Fatal(err) - } - t.Logf("[%d] layer info: name=%q l.Size=%d size=%d", i, info.Key, l.Size, info.Size) - - if msg := checkABCs(obj, int(l.Size)); msg != "" { - t.Errorf("[%d] %s", i, msg) - } + if msg := checkABCs(obj, int(l.Size)); msg != "" { + t.Errorf("[%d] %s", i, msg) } - }) -} - -func TestPush(t *testing.T) { - testPush(t, 0) - testPush(t, 1) + } } // TestBasicPresignS3MultipartReferenceDoNotDelete tests the basic flow of @@ -318,9 +319,11 @@ func startMinio(t *testing.T, trace bool) *minio.Client { if err != nil { t.Fatalf("startMinio: %v", err) } - if mc.IsOnline() { + // try list buckets to see if server is up + if _, err := mc.ListBuckets(ctx); err == nil { break } + t.Logf("startMinio: server is offline; retrying") } if trace {