From 1a346640dba00e73ef36b30db84b4bdfdc2b60dd Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Wed, 3 Apr 2024 14:44:29 -0700 Subject: [PATCH] x/registry: work on getting basic test passing --- x/registry/apitype/apitype.go | 2 +- x/registry/server.go | 14 ++--- x/registry/server_test.go | 108 ++++++++++++++++++++++++---------- 3 files changed, 85 insertions(+), 39 deletions(-) diff --git a/x/registry/apitype/apitype.go b/x/registry/apitype/apitype.go index 36f2a342..de6f6152 100644 --- a/x/registry/apitype/apitype.go +++ b/x/registry/apitype/apitype.go @@ -7,7 +7,7 @@ type Manifest struct { } type CompletePart struct { - URL string `json:"url"` // contains PartNumber and UploadID from server + URL string `json:"url"` // contains partNumber and uploadId from server ETag string `json:"etag"` } diff --git a/x/registry/server.go b/x/registry/server.go index b8dcafca..e6baf94b 100644 --- a/x/registry/server.go +++ b/x/registry/server.go @@ -109,11 +109,12 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { return err } q := u.Query() - uploadID := q.Get("UploadId") + uploadID := q.Get("uploadId") if uploadID == "" { - return oweb.Mistake("invalid", "url", "missing UploadId") + // not a part upload + continue } - partNumber, err := strconv.Atoi(q.Get("PartNumber")) + partNumber, err := strconv.Atoi(q.Get("partNumber")) if err != nil { return oweb.Mistake("invalid", "url", "invalid or missing PartNumber") } @@ -165,12 +166,10 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { } requirements = append(requirements, apitype.Requirement{ Digest: l.Digest, - Offset: 0, Size: l.Size, URL: signedURL.String(), }) } else { - key := path.Join("blobs", l.Digest) uploadID, err := mcc.NewMultipartUpload(r.Context(), bucketTODO, key, minio.PutObjectOptions{}) if err != nil { return err @@ -179,9 +178,8 @@ func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error { const timeToStartUpload = 15 * time.Minute signedURL, err := s.mc().Presign(r.Context(), "PUT", bucketTODO, key, timeToStartUpload, url.Values{ - "UploadId": []string{uploadID}, - "PartNumber": []string{strconv.Itoa(partNumber)}, - "ContentLength": []string{strconv.FormatInt(c.N, 10)}, + "uploadId": []string{uploadID}, + "partNumber": []string{strconv.Itoa(partNumber)}, }) if err != nil { return err diff --git a/x/registry/server_test.go b/x/registry/server_test.go index 1952feb9..72342256 100644 --- a/x/registry/server_test.go +++ b/x/registry/server_test.go @@ -1,7 +1,9 @@ package registry import ( + "bufio" "bytes" + "cmp" "context" "crypto/sha256" "encoding/json" @@ -33,11 +35,16 @@ func testPush(t *testing.T, chunkSize int64) { t.Run(fmt.Sprintf("chunkSize=%d", chunkSize), func(t *testing.T) { mc := startMinio(t, false) + const MB = 1024 * 1024 + const FiveMB = 5 * MB + + // Upload two small layers and one large layer that will + // trigger a multipart upload. manifest := []byte(`{ "layers": [ {"digest": "sha256-1", "size": 1}, {"digest": "sha256-2", "size": 2}, - {"digest": "sha256-3", "size": 3} + {"digest": "sha256-3", "size": 11000000} ] }`) @@ -198,7 +205,9 @@ func pushLayer(body io.ReaderAt, url string, off, n int64) (apitype.CompletePart // is tricky and if we get it wrong in our server, we can refer back to this // as a "back to basics" test/reference. func TestBasicPresignS3MultipartReferenceDoNotDelete(t *testing.T) { - mc := startMinio(t, false) + // t.Skip("skipping reference test; unskip when needed") + + mc := startMinio(t, true) mcc := &minio.Core{Client: mc} uploadID, err := mcc.NewMultipartUpload(context.Background(), "test", "theKey", minio.PutObjectOptions{}) @@ -288,9 +297,13 @@ func availableAddr() string { return l.Addr().String() } -func startMinio(t *testing.T, debug bool) *minio.Client { +func startMinio(t *testing.T, trace bool) *minio.Client { t.Helper() + // Trace is enabled by setting the OLLAMA_MINIO_TRACE environment or + // explicitly setting trace to true. + trace = cmp.Or(trace, os.Getenv("OLLAMA_MINIO_TRACE") != "") + dir := t.TempDir() + "-keep" // prevent tempdir from auto delete t.Cleanup(func() { @@ -298,26 +311,42 @@ func startMinio(t *testing.T, debug bool) *minio.Client { // future runs may be able to inspect results for some time. }) + waitAndMaybeLogError := func(cmd *exec.Cmd) { + if err := cmd.Wait(); err != nil { + var e *exec.ExitError + if errors.As(err, &e) { + if !e.Exited() { + // died due to our signal + return + } + t.Errorf("%s stderr: %s", cmd.Path, e.Stderr) + t.Errorf("%s exit status: %v", cmd.Path, e.ExitCode()) + t.Errorf("%s exited: %v", cmd.Path, e.Exited()) + t.Errorf("%s stderr: %s", cmd.Path, e.Stderr) + } else { + t.Errorf("%s exit error: %v", cmd.Path, err) + } + } + } + + // Cancel must be called first so do wait to add to Cleanup + // stack as last cleanup. + ctx, cancel := context.WithCancel(context.Background()) + deadline, ok := t.Deadline() + if ok { + ctx, cancel = context.WithDeadline(ctx, deadline.Add(-100*time.Millisecond)) + } + t.Logf(">> minio: minio server %s", dir) addr := availableAddr() - cmd := exec.Command("minio", "server", "--address", addr, dir) + cmd := exec.CommandContext(ctx, "minio", "server", "--address", addr, dir) cmd.Env = os.Environ() - - // TODO(bmizerany): wait delay etc... if err := cmd.Start(); err != nil { t.Fatal(err) } t.Cleanup(func() { - cmd.Process.Kill() - if err := cmd.Wait(); err != nil { - var e *exec.ExitError - if errors.As(err, &e) && e.Exited() { - t.Logf("minio stderr: %s", e.Stderr) - t.Logf("minio exit status: %v", e.ExitCode()) - t.Logf("minio exited: %v", e.Exited()) - t.Error(err) - } - } + cancel() + waitAndMaybeLogError(cmd) }) mc, err := minio.New(addr, &minio.Options{ @@ -328,13 +357,6 @@ func startMinio(t *testing.T, debug bool) *minio.Client { t.Fatal(err) } - ctx, cancel := context.WithCancel(context.Background()) - deadline, ok := t.Deadline() - if ok { - ctx, cancel = context.WithDeadline(ctx, deadline.Add(-100*time.Millisecond)) - defer cancel() - } - // wait for server to start with exponential backoff for _, err := range backoff.Upto(ctx, 1*time.Second) { if err != nil { @@ -345,18 +367,44 @@ func startMinio(t *testing.T, debug bool) *minio.Client { } } - if debug { - // I was using mc.TraceOn here but wasn't giving any output - // that was meaningful. I really want all server logs, not - // client HTTP logs. We have places we do not use a minio - // client and cannot or do not want to use a minio client. - panic("TODO") + if trace { + cmd := exec.CommandContext(ctx, "mc", "admin", "trace", "--verbose", "test") + cmd.Env = append(os.Environ(), + "MC_HOST_test=http://minioadmin:minioadmin@"+addr, + ) + + stdout, err := cmd.StdoutPipe() + if err != nil { + t.Fatal(err) + } + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + doneLogging := make(chan struct{}) + sc := bufio.NewScanner(stdout) + go func() { + defer close(doneLogging) + + // Scan lines until the process exits. + for sc.Scan() { + t.Logf("mc trace: %s", sc.Text()) + } + _ = sc.Err() // ignore (not important) + }() + t.Cleanup(func() { + cancel() + waitAndMaybeLogError(cmd) + + // Make sure we do not log after test exists to + // avoid panic. + <-doneLogging + }) } if err := mc.MakeBucket(context.Background(), "test", minio.MakeBucketOptions{}); err != nil { t.Fatal(err) } - return mc }