From cb42e607c5cf4d439ad4d5a93ed13c7d6a09fc34 Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Mon, 24 Jun 2024 21:47:52 -0700 Subject: [PATCH] llm: speed up gguf decoding by a lot (#5246) Previously, some costly things were causing the loading of GGUF files and their metadata and tensor information to be VERY slow: * Too many allocations when decoding strings * Hitting disk for each read of each key and value, resulting in a not-okay amount of syscalls/disk I/O. The show API is now down to 33ms from 800ms+ for llama3 on a macbook pro m3. This commit also prevents collecting large arrays of values when decoding GGUFs (if desired). When such keys are encountered, their values are null, and are encoded as such in JSON. Also, this fixes a broken test that was not encoding valid GGUF. --- llm/ggla.go | 13 ++- llm/ggml.go | 25 ++++-- llm/ggml_test.go | 1 + llm/gguf.go | 130 +++++++++++++++++++-------- llm/memory_test.go | 19 ++-- llm/server.go | 11 ++- server/images.go | 2 +- server/model.go | 6 +- server/routes.go | 19 +++- server/sched.go | 2 +- server/sched_test.go | 6 +- util/bufioutil/buffer_seeker.go | 34 +++++++ util/bufioutil/buffer_seeker_test.go | 64 +++++++++++++ 13 files changed, 263 insertions(+), 69 deletions(-) create mode 100644 llm/ggml_test.go create mode 100644 util/bufioutil/buffer_seeker.go create mode 100644 util/bufioutil/buffer_seeker_test.go diff --git a/llm/ggla.go b/llm/ggla.go index a5d90b6c..34c4f6ca 100644 --- a/llm/ggla.go +++ b/llm/ggla.go @@ -53,7 +53,7 @@ func (llm *ggla) Tensors() Tensors { return llm.tensors } -func (llm *ggla) decode(rs io.ReadSeeker) error { +func (llm *ggla) decode(rs io.ReadSeeker) (retErr error) { var r uint32 if err := binary.Read(rs, binary.LittleEndian, &r); err != nil { return err @@ -69,9 +69,18 @@ func (llm *ggla) decode(rs io.ReadSeeker) error { for { var dims uint32 if err := binary.Read(rs, binary.LittleEndian, &dims); err != nil { + if errors.Is(err, io.EOF) { + return nil + } return err } + defer func() { + if errors.Is(retErr, io.EOF) { + retErr = io.ErrUnexpectedEOF + } + }() + var namesize uint32 if err := binary.Read(rs, binary.LittleEndian, &namesize); err != nil { return err @@ -108,7 +117,7 @@ func (llm *ggla) decode(rs io.ReadSeeker) error { return err } - if _, err := rs.Seek((offset+31)&-32, io.SeekStart); err != nil { + if _, err := rs.Seek((offset+31)&-32-offset, io.SeekCurrent); err != nil { return err } diff --git a/llm/ggml.go b/llm/ggml.go index f02f0ff6..d0d0b6dd 100644 --- a/llm/ggml.go +++ b/llm/ggml.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "strings" + + "github.com/ollama/ollama/util/bufioutil" ) type GGML struct { @@ -278,7 +280,18 @@ func DetectGGMLType(b []byte) string { } } -func DecodeGGML(rs io.ReadSeeker) (*GGML, int64, error) { +// DecodeGGML decodes a GGML model from the given reader. +// +// It collects array values for arrays with a size less than or equal to +// maxArraySize. If maxArraySize is 0, the default value of 1024 is used. If +// the maxArraySize is negative, all arrays are collected. +func DecodeGGML(rs io.ReadSeeker, maxArraySize int) (*GGML, int64, error) { + if maxArraySize == 0 { + maxArraySize = 1024 + } + + rs = bufioutil.NewBufferedSeeker(rs, 32<<10) + var magic uint32 if err := binary.Read(rs, binary.LittleEndian, &magic); err != nil { return nil, 0, err @@ -291,17 +304,15 @@ func DecodeGGML(rs io.ReadSeeker) (*GGML, int64, error) { case FILE_MAGIC_GGLA: c = &containerGGLA{} case FILE_MAGIC_GGUF_LE: - c = &containerGGUF{ByteOrder: binary.LittleEndian} + c = &containerGGUF{ByteOrder: binary.LittleEndian, maxArraySize: maxArraySize} case FILE_MAGIC_GGUF_BE: - c = &containerGGUF{ByteOrder: binary.BigEndian} + c = &containerGGUF{ByteOrder: binary.BigEndian, maxArraySize: maxArraySize} default: return nil, 0, errors.New("invalid file magic") } model, err := c.Decode(rs) - if errors.Is(err, io.EOF) { - // noop - } else if err != nil { + if err != nil { return nil, 0, err } @@ -321,7 +332,7 @@ func (llm GGML) GraphSize(context, batch uint64) (partialOffload, fullOffload ui embedding := llm.KV().EmbeddingLength() heads := llm.KV().HeadCount() headsKV := llm.KV().HeadCountKV() - vocab := uint64(len(llm.KV()["tokenizer.ggml.tokens"].([]any))) + vocab := uint64(llm.KV()["tokenizer.ggml.tokens"].(*array).size) embeddingHeads := llm.KV().EmbeddingHeadCount() embeddingHeadsK := llm.KV().EmbeddingHeadCountK() diff --git a/llm/ggml_test.go b/llm/ggml_test.go new file mode 100644 index 00000000..006c3ded --- /dev/null +++ b/llm/ggml_test.go @@ -0,0 +1 @@ +package llm diff --git a/llm/gguf.go b/llm/gguf.go index 234efe57..4d343a1b 100644 --- a/llm/gguf.go +++ b/llm/gguf.go @@ -3,11 +3,10 @@ package llm import ( "bytes" "encoding/binary" + "encoding/json" "fmt" "io" "strings" - - "log/slog" ) type containerGGUF struct { @@ -29,6 +28,12 @@ type containerGGUF struct { NumTensor uint64 NumKV uint64 } + + maxArraySize int +} + +func (c *containerGGUF) canCollectArray(size int) bool { + return c.maxArraySize < 0 || size <= c.maxArraySize } func (c *containerGGUF) Name() string { @@ -54,7 +59,6 @@ func (c *containerGGUF) Decode(rs io.ReadSeeker) (model, error) { } model := newGGUF(c) - slog.Debug(fmt.Sprintf("model = %#v", model)) if err := model.Decode(rs); err != nil { return nil, err } @@ -85,6 +89,8 @@ type gguf struct { tensors []*Tensor parameters uint64 + + scratch [16 << 10]byte } func newGGUF(container *containerGGUF) *gguf { @@ -181,34 +187,34 @@ func (llm *gguf) Decode(rs io.ReadSeeker) error { } // decode tensors - for i := 0; uint64(i) < llm.numTensor(); i++ { + for range llm.numTensor() { name, err := readGGUFString(llm, rs) if err != nil { - return err + return fmt.Errorf("failed to read tensor name: %w", err) } // dims is the number of dimensions in the tensor dims, err := readGGUF[uint32](llm, rs) if err != nil { - return err + return fmt.Errorf("failed to read tensor dimensions: %w", err) } shape := [4]uint64{1, 1, 1, 1} for i := 0; uint32(i) < dims; i++ { shape[i], err = readGGUF[uint64](llm, rs) if err != nil { - return err + return fmt.Errorf("failed to read tensor shape: %w", err) } } kind, err := readGGUF[uint32](llm, rs) if err != nil { - return err + return fmt.Errorf("failed to read tensor kind: %w", err) } offset, err := readGGUF[uint64](llm, rs) if err != nil { - return err + return fmt.Errorf("failed to read tensor offset: %w", err) } tensor := Tensor{ @@ -230,24 +236,19 @@ func (llm *gguf) Decode(rs io.ReadSeeker) error { alignment = 32 } - offset, err := rs.Seek(0, io.SeekCurrent) - if err != nil { - return err - } - - padding := llm.padding(offset, int64(alignment)) - if _, err := rs.Seek(padding, io.SeekCurrent); err != nil { - return err - } - for _, tensor := range llm.tensors { - if _, err := rs.Seek(int64(tensor.Size()), io.SeekCurrent); err != nil { - return err + offset, err := rs.Seek(0, io.SeekCurrent) + if err != nil { + return fmt.Errorf("failed to get current offset: %w", err) } - padding := llm.padding(int64(tensor.Size()), int64(alignment)) + padding := llm.padding(offset, int64(alignment)) if _, err := rs.Seek(padding, io.SeekCurrent); err != nil { - return err + return fmt.Errorf("failed to seek to init padding: %w", err) + } + + if _, err := rs.Seek(int64(tensor.Size()), io.SeekCurrent); err != nil { + return fmt.Errorf("failed to seek to tensor: %w", err) } } @@ -285,22 +286,48 @@ func readGGUFV1String(llm *gguf, r io.Reader) (string, error) { return b.String(), nil } +func discardGGUFString(llm *gguf, r io.Reader) error { + buf := llm.scratch[:8] + _, err := io.ReadFull(r, buf) + if err != nil { + return err + } + + size := int(llm.ByteOrder.Uint64(buf)) + for size > 0 { + n, err := r.Read(llm.scratch[:min(size, cap(llm.scratch))]) + if err != nil { + return err + } + size -= n + } + return nil +} + func readGGUFString(llm *gguf, r io.Reader) (string, error) { if llm.Version == 1 { return readGGUFV1String(llm, r) } - var length uint64 - if err := binary.Read(r, llm.ByteOrder, &length); err != nil { + buf := llm.scratch[:8] + _, err := io.ReadFull(r, buf) + if err != nil { return "", err } - var b bytes.Buffer - if _, err := io.CopyN(&b, r, int64(length)); err != nil { + length := int(llm.ByteOrder.Uint64(buf)) + if length > len(llm.scratch) { + buf = make([]byte, length) + } else { + buf = llm.scratch[:length] + } + clear(buf) + + _, err = io.ReadFull(r, buf) + if err != nil { return "", err } - - return b.String(), nil + return string(buf), nil } func writeGGUFString(llm *gguf, w io.Writer, s string) error { @@ -316,7 +343,16 @@ func writeGGUFString(llm *gguf, w io.Writer, s string) error { return err } -func readGGUFV1Array(llm *gguf, r io.Reader) (a []any, err error) { +type array struct { + size int + values []any +} + +func (a *array) MarshalJSON() ([]byte, error) { + return json.Marshal(a.values) +} + +func readGGUFV1Array(llm *gguf, r io.Reader) (*array, error) { t, err := readGGUF[uint32](llm, r) if err != nil { return nil, err @@ -327,7 +363,12 @@ func readGGUFV1Array(llm *gguf, r io.Reader) (a []any, err error) { return nil, err } - for i := 0; uint32(i) < n; i++ { + a := &array{size: int(n)} + if llm.canCollectArray(int(n)) { + a.values = make([]any, 0, int(n)) + } + + for i := range n { var e any switch t { case ggufTypeUint8: @@ -361,13 +402,15 @@ func readGGUFV1Array(llm *gguf, r io.Reader) (a []any, err error) { return nil, err } - a = append(a, e) + if a.values != nil { + a.values[i] = e + } } - return + return a, nil } -func readGGUFArray(llm *gguf, r io.Reader) (a []any, err error) { +func readGGUFArray(llm *gguf, r io.Reader) (*array, error) { if llm.Version == 1 { return readGGUFV1Array(llm, r) } @@ -382,7 +425,12 @@ func readGGUFArray(llm *gguf, r io.Reader) (a []any, err error) { return nil, err } - for i := 0; uint64(i) < n; i++ { + a := &array{size: int(n)} + if llm.canCollectArray(int(n)) { + a.values = make([]any, int(n)) + } + + for i := range n { var e any switch t { case ggufTypeUint8: @@ -408,7 +456,11 @@ func readGGUFArray(llm *gguf, r io.Reader) (a []any, err error) { case ggufTypeBool: e, err = readGGUF[bool](llm, r) case ggufTypeString: - e, err = readGGUFString(llm, r) + if a.values != nil { + e, err = readGGUFString(llm, r) + } else { + err = discardGGUFString(llm, r) + } default: return nil, fmt.Errorf("invalid array type: %d", t) } @@ -416,10 +468,12 @@ func readGGUFArray(llm *gguf, r io.Reader) (a []any, err error) { return nil, err } - a = append(a, e) + if a.values != nil { + a.values[i] = e + } } - return + return a, nil } func writeGGUFArray[S ~[]E, E any](llm *gguf, w io.Writer, t uint32, s S) error { diff --git a/llm/memory_test.go b/llm/memory_test.go index 8eaa0771..f972f927 100644 --- a/llm/memory_test.go +++ b/llm/memory_test.go @@ -22,13 +22,14 @@ func TestEstimateGPULayers(t *testing.T) { defer f.Close() gguf := NewGGUFV3(binary.LittleEndian) inputLayerCount := 5 + tensors := []Tensor{ - {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, - {Name: "blk.1.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, - {Name: "blk.2.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, - {Name: "blk.3.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, - {Name: "blk.4.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, - {Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, + {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, + {Name: "blk.1.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, + {Name: "blk.2.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, + {Name: "blk.3.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, + {Name: "blk.4.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, + {Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, } assert.Len(t, tensors, inputLayerCount+1) err = gguf.Encode(f, KV{ @@ -45,8 +46,10 @@ func TestEstimateGPULayers(t *testing.T) { }, tensors) require.NoError(t, err) - ggml, err := LoadModel(f.Name()) - require.NoError(t, err) + ggml, err := LoadModel(f.Name(), 0) + if err != nil { + t.Fatal(err) + } // Simple CPU scenario gpus := []gpu.GpuInfo{ diff --git a/llm/server.go b/llm/server.go index da83416e..ad67138b 100644 --- a/llm/server.go +++ b/llm/server.go @@ -60,7 +60,12 @@ type llmServer struct { sem *semaphore.Weighted } -func LoadModel(model string) (*GGML, error) { +// LoadModel will load a model from disk. The model must be in the GGML format. +// +// It collects array values for arrays with a size less than or equal to +// maxArraySize. If maxArraySize is 0, the default value of 1024 is used. If +// the maxArraySize is negative, all arrays are collected. +func LoadModel(model string, maxArraySize int) (*GGML, error) { if _, err := os.Stat(model); err != nil { return nil, err } @@ -71,7 +76,7 @@ func LoadModel(model string) (*GGML, error) { } defer f.Close() - ggml, _, err := DecodeGGML(f) + ggml, _, err := DecodeGGML(f, maxArraySize) return ggml, err } @@ -412,7 +417,7 @@ func projectorMemoryRequirements(filename string) uint64 { } defer file.Close() - ggml, _, err := DecodeGGML(file) + ggml, _, err := DecodeGGML(file, 0) if err != nil { return 0 } diff --git a/server/images.go b/server/images.go index 98794149..e949fb18 100644 --- a/server/images.go +++ b/server/images.go @@ -423,7 +423,7 @@ func CreateModel(ctx context.Context, name model.Name, modelFileDir, quantizatio return err } - ggml, _, err := llm.DecodeGGML(temp) + ggml, _, err := llm.DecodeGGML(temp, 0) if err != nil { return err } diff --git a/server/model.go b/server/model.go index b262ea38..055ffd63 100644 --- a/server/model.go +++ b/server/model.go @@ -63,7 +63,7 @@ func parseFromModel(ctx context.Context, name model.Name, fn func(api.ProgressRe } defer blob.Close() - ggml, _, err := llm.DecodeGGML(blob) + ggml, _, err := llm.DecodeGGML(blob, 0) if err != nil { return nil, err } @@ -176,7 +176,7 @@ func parseFromZipFile(_ context.Context, file *os.File, digest string, fn func(a } defer bin.Close() - ggml, _, err := llm.DecodeGGML(bin) + ggml, _, err := llm.DecodeGGML(bin, 0) if err != nil { return nil, err } @@ -210,7 +210,7 @@ func parseFromFile(ctx context.Context, file *os.File, digest string, fn func(ap var offset int64 for offset < stat.Size() { - ggml, n, err := llm.DecodeGGML(file) + ggml, n, err := llm.DecodeGGML(file, 0) if errors.Is(err, io.EOF) { break } else if err != nil { diff --git a/server/routes.go b/server/routes.go index 3d112e9f..ff66663c 100644 --- a/server/routes.go +++ b/server/routes.go @@ -754,7 +754,11 @@ func GetModelInfo(req api.ShowRequest) (*api.ShowResponse, error) { } func getKVData(digest string, verbose bool) (llm.KV, error) { - kvData, err := llm.LoadModel(digest) + maxArraySize := 0 + if verbose { + maxArraySize = -1 + } + kvData, err := llm.LoadModel(digest, maxArraySize) if err != nil { return nil, err } @@ -1101,11 +1105,20 @@ func Serve(ln net.Listener) error { schedCtx, schedDone := context.WithCancel(ctx) sched := InitScheduler(schedCtx) s := &Server{addr: ln.Addr(), sched: sched} - r := s.GenerateRoutes() + + http.Handle("/", s.GenerateRoutes()) slog.Info(fmt.Sprintf("Listening on %s (version %s)", ln.Addr(), version.Version)) srvr := &http.Server{ - Handler: r, + // Use http.DefaultServeMux so we get net/http/pprof for + // free. + // + // TODO(bmizerany): Decide if we want to make this + // configurable so it is not exposed by default, or allow + // users to bind it to a different port. This was a quick + // and easy way to get pprof, but it may not be the best + // way. + Handler: nil, } // listen for a ctrl+c and stop any loaded llm diff --git a/server/sched.go b/server/sched.go index 42439554..0084b533 100644 --- a/server/sched.go +++ b/server/sched.go @@ -144,7 +144,7 @@ func (s *Scheduler) processPending(ctx context.Context) { } // Load model for fitting - ggml, err := llm.LoadModel(pending.model.ModelPath) + ggml, err := llm.LoadModel(pending.model.ModelPath, 0) if err != nil { pending.errCh <- err break diff --git a/server/sched_test.go b/server/sched_test.go index 95328834..4a1cf72a 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -128,14 +128,14 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV "tokenizer.ggml.scores": []float32{0}, "tokenizer.ggml.token_type": []int32{0}, }, []llm.Tensor{ - {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, - {Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, + {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, + {Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))}, }) require.NoError(t, err) fname := f.Name() model := &Model{Name: modelName, ModelPath: fname} - scenario.ggml, err = llm.LoadModel(model.ModelPath) + scenario.ggml, err = llm.LoadModel(model.ModelPath, 0) require.NoError(t, err) scenario.req = &LlmRequest{ diff --git a/util/bufioutil/buffer_seeker.go b/util/bufioutil/buffer_seeker.go new file mode 100644 index 00000000..8775fdb8 --- /dev/null +++ b/util/bufioutil/buffer_seeker.go @@ -0,0 +1,34 @@ +package bufioutil + +import ( + "bufio" + "io" +) + +type BufferedSeeker struct { + rs io.ReadSeeker + br *bufio.Reader +} + +func NewBufferedSeeker(rs io.ReadSeeker, size int) *BufferedSeeker { + return &BufferedSeeker{ + rs: rs, + br: bufio.NewReaderSize(rs, size), + } +} + +func (b *BufferedSeeker) Read(p []byte) (int, error) { + return b.br.Read(p) +} + +func (b *BufferedSeeker) Seek(offset int64, whence int) (int64, error) { + if whence == io.SeekCurrent { + offset -= int64(b.br.Buffered()) + } + n, err := b.rs.Seek(offset, whence) + if err != nil { + return 0, err + } + b.br.Reset(b.rs) + return n, nil +} diff --git a/util/bufioutil/buffer_seeker_test.go b/util/bufioutil/buffer_seeker_test.go new file mode 100644 index 00000000..87145f6b --- /dev/null +++ b/util/bufioutil/buffer_seeker_test.go @@ -0,0 +1,64 @@ +package bufioutil + +import ( + "bytes" + "io" + "strings" + "testing" +) + +func TestBufferedSeeker(t *testing.T) { + const alphabet = "abcdefghijklmnopqrstuvwxyz" + + bs := NewBufferedSeeker(strings.NewReader(alphabet), 0) // minReadBufferSize = 16 + + checkRead := func(buf []byte, expected string) { + t.Helper() + _, err := bs.Read(buf) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, []byte(expected)) { + t.Fatalf("expected %s, got %s", expected, buf) + } + } + + // Read the first 5 bytes + buf := make([]byte, 5) + + checkRead(buf, "abcde") + + // Seek back to the beginning + _, err := bs.Seek(0, io.SeekStart) + if err != nil { + t.Fatal(err) + } + + // read 'a' + checkRead(buf[:1], "a") + + if bs.br.Buffered() == 0 { + t.Fatalf("totally unexpected sanity check failed") + } + + // Seek past 'b' + _, err = bs.Seek(1, io.SeekCurrent) + if err != nil { + t.Fatal(err) + } + checkRead(buf, "cdefg") + + // Seek back to the beginning + _, err = bs.Seek(0, io.SeekStart) + if err != nil { + t.Fatal(err) + } + checkRead(buf, "abcde") + + // Seek to the end + _, err = bs.Seek(-5, io.SeekEnd) + if err != nil { + t.Fatal(err) + } + checkRead(buf, "vwxyz") +}