diff --git a/core/cover.go b/core/cover.go index e121327d2..43c70fa19 100644 --- a/core/cover.go +++ b/core/cover.go @@ -202,5 +202,5 @@ func readFromFile(path string) ([]byte, error) { } func NewImageCache() (ImageCache, error) { - return newFileCache("Image", conf.Server.ImageCacheSize, consts.ImageCacheDir, consts.DefaultImageCacheMaxItems) + return newFSCache("Image", conf.Server.ImageCacheSize, consts.ImageCacheDir, consts.DefaultImageCacheMaxItems) } diff --git a/core/file_caches.go b/core/file_caches.go index a8f0f5bfb..29ac0216e 100644 --- a/core/file_caches.go +++ b/core/file_caches.go @@ -1,7 +1,9 @@ package core import ( + "context" "fmt" + "io" "path/filepath" "github.com/deluan/navidrome/conf" @@ -11,7 +13,116 @@ import ( "github.com/dustin/go-humanize" ) -func newFileCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) { +type ReadFunc func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) + +func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) (*FileCache, error) { + cache, err := newFSCache(name, cacheSize, cacheFolder, maxItems) + if err != nil { + return nil, err + } + return &FileCache{ + name: name, + disabled: cache == nil, + cache: cache, + getReader: getReader, + }, nil +} + +type FileCache struct { + disabled bool + name string + cache fscache.Cache + getReader ReadFunc +} + +func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) { + if fc.disabled { + log.Debug(ctx, "Cache disabled", "cache", fc.name) + reader, err := fc.getReader(ctx, arg) + if err != nil { + return nil, err + } + return &CachedStream{Reader: reader}, nil + } + + key := arg.String() + r, w, err := fc.cache.Get(key) + if err != nil { + return nil, err + } + + cached := w == nil + + if !cached { + log.Trace(ctx, "Cache MISS", "cache", fc.name, "key", key) + reader, err := fc.getReader(ctx, arg) + if err != nil { + return nil, err + } + go copyAndClose(ctx, w, reader) + } + + // If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker + if cached { + size := getFinalCachedSize(r) + if size >= 0 { + log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key, "size", size) + sr := io.NewSectionReader(r, 0, size) + return &CachedStream{ + Reader: sr, + Seeker: sr, + }, nil + } else { + log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key) + } + } + + // All other cases, just return a Reader, without Seek capabilities + return &CachedStream{Reader: r}, nil +} + +type CachedStream struct { + io.Reader + io.Seeker +} + +func (s *CachedStream) Seekable() bool { return s.Seeker != nil } +func (s *CachedStream) Close() error { + if c, ok := s.Reader.(io.Closer); ok { + return c.Close() + } + return nil +} + +func getFinalCachedSize(r fscache.ReadAtCloser) int64 { + cr, ok := r.(*fscache.CacheReader) + if ok { + size, final, err := cr.Size() + if final && err == nil { + return size + } + } + return -1 +} + +func copyAndClose(ctx context.Context, w io.WriteCloser, r io.Reader) { + _, err := io.Copy(w, r) + if err != nil { + log.Error(ctx, "Error copying data to cache", err) + } + if c, ok := r.(io.Closer); ok { + err = c.Close() + if err != nil { + log.Error(ctx, "Error closing source stream", err) + } + } + err = w.Close() + if err != nil { + log.Error(ctx, "Error closing cache writer", err) + } +} + +func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) { if cacheSize == "0" { log.Warn(fmt.Sprintf("%s cache disabled", name)) return nil, nil diff --git a/core/file_caches_test.go b/core/file_caches_test.go index e5e8e32ee..be25084d0 100644 --- a/core/file_caches_test.go +++ b/core/file_caches_test.go @@ -18,20 +18,30 @@ var _ = Describe("File Caches", func() { os.RemoveAll(conf.Server.DataFolder) }) - Describe("newFileCache", func() { + Describe("NewFileCache", func() { It("creates the cache folder", func() { - Expect(newFileCache("test", "1k", "test", 10)).ToNot(BeNil()) + Expect(NewFileCache("test", "1k", "test", 10, nil)).ToNot(BeNil()) _, err := os.Stat(filepath.Join(conf.Server.DataFolder, "test")) Expect(os.IsNotExist(err)).To(BeFalse()) }) It("creates the cache folder with invalid size", func() { - Expect(newFileCache("test", "abc", "test", 10)).ToNot(BeNil()) + fc, err := NewFileCache("test", "abc", "test", 10, nil) + Expect(err).To(BeNil()) + Expect(fc.cache).ToNot(BeNil()) + Expect(fc.disabled).To(BeFalse()) }) It("returns empty if cache size is '0'", func() { - Expect(newFileCache("test", "0", "test", 10)).To(BeNil()) + fc, err := NewFileCache("test", "0", "test", 10, nil) + Expect(err).To(BeNil()) + Expect(fc.cache).To(BeNil()) + Expect(fc.disabled).To(BeTrue()) }) }) + + Describe("FileCache", func() { + + }) }) diff --git a/core/media_streamer.go b/core/media_streamer.go index 298705810..5c9d6dc91 100644 --- a/core/media_streamer.go +++ b/core/media_streamer.go @@ -14,23 +14,31 @@ import ( "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model" "github.com/deluan/navidrome/model/request" - "github.com/djherbis/fscache" ) type MediaStreamer interface { NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error) } -type TranscodingCache fscache.Cache - -func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache TranscodingCache) MediaStreamer { +func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache *FileCache) MediaStreamer { return &mediaStreamer{ds: ds, ffm: ffm, cache: cache} } type mediaStreamer struct { ds model.DataStore ffm transcoder.Transcoder - cache fscache.Cache + cache *FileCache +} + +type streamJob struct { + ms *mediaStreamer + mf *model.MediaFile + format string + bitRate int +} + +func (j *streamJob) String() string { + return fmt.Sprintf("%s.%d.%s", j.mf.ID, j.bitRate, j.format) } func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error) { @@ -49,17 +57,13 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str }() format, bitRate = selectTranscodingOptions(ctx, ms.ds, mf, reqFormat, reqBitRate) - log.Trace(ctx, "Selected transcoding options", - "requestBitrate", reqBitRate, "requestFormat", reqFormat, - "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, - "selectedBitrate", bitRate, "selectedFormat", format, - ) s := &Stream{ctx: ctx, mf: mf, format: format, bitRate: bitRate} if format == "raw" { - log.Debug(ctx, "Streaming raw file", "id", mf.ID, "path", mf.Path, + log.Debug(ctx, "Streaming RAW file", "id", mf.ID, "path", mf.Path, "requestBitrate", reqBitRate, "requestFormat", reqFormat, - "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix) + "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, + "selectedBitrate", bitRate, "selectedFormat", format) f, err := os.Open(mf.Path) if err != nil { return nil, err @@ -71,70 +75,30 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str return s, nil } - key := cacheKey(id, bitRate, format) - r, w, err := ms.cache.Get(key) + job := &streamJob{ + ms: ms, + mf: mf, + format: format, + bitRate: bitRate, + } + r, err := ms.cache.Get(ctx, job) if err != nil { - log.Error(ctx, "Error creating stream caching buffer", "id", mf.ID, err) + log.Error(ctx, "Error accessing cache", "id", mf.ID, err) return nil, err } - cached = w == nil - - // If this is a brand new transcoding request, not in the cache, start transcoding - if !cached { - log.Trace(ctx, "Cache miss. Starting new transcoding session", "id", mf.ID) - t, err := ms.ds.Transcoding(ctx).FindByFormat(format) - if err != nil { - log.Error(ctx, "Error loading transcoding command", "format", format, err) - return nil, os.ErrInvalid - } - out, err := ms.ffm.Start(ctx, t.Command, mf.Path, bitRate) - if err != nil { - log.Error(ctx, "Error starting transcoder", "id", mf.ID, err) - return nil, os.ErrInvalid - } - go copyAndClose(ctx, w, out) - } - - // If it is in the cache, check if the stream is done being written. If so, return a ReaderSeeker - if cached { - size := getFinalCachedSize(r) - if size > 0 { - log.Debug(ctx, "Streaming cached file", "id", mf.ID, "path", mf.Path, - "requestBitrate", reqBitRate, "requestFormat", reqFormat, - "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, "size", size) - sr := io.NewSectionReader(r, 0, size) - s.Reader = sr - s.Closer = r - s.Seeker = sr - s.format = format - return s, nil - } - } - - log.Debug(ctx, "Streaming transcoded file", "id", mf.ID, "path", mf.Path, + log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", mf.Path, "requestBitrate", reqBitRate, "requestFormat", reqFormat, - "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix) - // All other cases, just return a ReadCloser, without Seek capabilities + "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, + "selectedBitrate", bitRate, "selectedFormat", format) + s.Reader = r s.Closer = r - s.format = format - return s, nil -} + if r.Seekable() { + s.Seeker = r + } -func copyAndClose(ctx context.Context, w io.WriteCloser, r io.ReadCloser) { - _, err := io.Copy(w, r) - if err != nil { - log.Error(ctx, "Error copying data to cache", err) - } - err = r.Close() - if err != nil { - log.Error(ctx, "Error closing transcode output", err) - } - err = w.Close() - if err != nil { - log.Error(ctx, "Error closing cache", err) - } + return s, nil } type Stream struct { @@ -202,21 +166,21 @@ func selectTranscodingOptions(ctx context.Context, ds model.DataStore, mf *model return } -func cacheKey(id string, bitRate int, format string) string { - return fmt.Sprintf("%s.%d.%s", id, bitRate, format) -} - -func getFinalCachedSize(r fscache.ReadAtCloser) int64 { - cr, ok := r.(*fscache.CacheReader) - if ok { - size, final, err := cr.Size() - if final && err == nil { - return size - } - } - return -1 -} - -func NewTranscodingCache() (TranscodingCache, error) { - return newFileCache("Transcoding", conf.Server.TranscodingCacheSize, consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems) +func NewTranscodingCache() (*FileCache, error) { + return NewFileCache("Transcoding", conf.Server.TranscodingCacheSize, + consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems, + func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { + job := arg.(*streamJob) + t, err := job.ms.ds.Transcoding(ctx).FindByFormat(job.format) + if err != nil { + log.Error(ctx, "Error loading transcoding command", "format", job.format, err) + return nil, os.ErrInvalid + } + out, err := job.ms.ffm.Start(ctx, t.Command, job.mf.Path, job.bitRate) + if err != nil { + log.Error(ctx, "Error starting transcoder", "id", job.mf.ID, err) + return nil, os.ErrInvalid + } + return out, nil + }) } diff --git a/core/media_streamer_test.go b/core/media_streamer_test.go index 294816265..d7ea3c195 100644 --- a/core/media_streamer_test.go +++ b/core/media_streamer_test.go @@ -3,8 +3,11 @@ package core import ( "context" "io" + "io/ioutil" + "os" "strings" + "github.com/deluan/navidrome/conf" "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model" "github.com/deluan/navidrome/model/request" @@ -18,13 +21,21 @@ var _ = Describe("MediaStreamer", func() { var ds model.DataStore ffmpeg := &fakeFFmpeg{Data: "fake data"} ctx := log.NewContext(context.TODO()) + log.SetLevel(log.LevelTrace) BeforeEach(func() { + conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches") + conf.Server.TranscodingCacheSize = "100MB" ds = &persistence.MockDataStore{MockedTranscoding: &mockTranscodingRepository{}} ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "suffix": "mp3", "bitRate": 128, "duration": 257.0}]`) + testCache, _ := NewTranscodingCache() streamer = NewMediaStreamer(ds, ffmpeg, testCache) }) + AfterEach(func() { + os.RemoveAll(conf.Server.DataFolder) + }) + Context("NewStream", func() { It("returns a seekable stream if format is 'raw'", func() { s, err := streamer.NewStream(ctx, "123", "raw", 0) @@ -48,8 +59,11 @@ var _ = Describe("MediaStreamer", func() { Expect(s.Duration()).To(Equal(float32(257.0))) }) It("returns a seekable stream if the file is complete in the cache", func() { + s, err := streamer.NewStream(ctx, "123", "mp3", 32) + Expect(err).To(BeNil()) Eventually(func() bool { return ffmpeg.closed }, "3s").Should(BeTrue()) - s, err := streamer.NewStream(ctx, "123", "mp3", 64) + + s, err = streamer.NewStream(ctx, "123", "mp3", 32) Expect(err).To(BeNil()) Expect(s.Seekable()).To(BeTrue()) })