Fix timer going awry

This commit is contained in:
Deluan 2023-01-17 22:04:04 -05:00
parent feb774a149
commit 580e9ae4bd
5 changed files with 14 additions and 21 deletions

View File

@ -65,10 +65,11 @@ func (a *cacheWarmer) sendWakeSignal() {
func (a *cacheWarmer) run(ctx context.Context) { func (a *cacheWarmer) run(ctx context.Context) {
for { for {
time.AfterFunc(10*time.Second, func() { t := time.AfterFunc(10*time.Second, func() {
a.sendWakeSignal() a.sendWakeSignal()
}) })
<-a.wakeSignal <-a.wakeSignal
t.Stop()
// If cache not available, keep waiting // If cache not available, keep waiting
if !a.cache.Available(ctx) { if !a.cache.Available(ctx) {

View File

@ -27,7 +27,7 @@ var _ = Describe("MediaStreamer", func() {
{ID: "123", Path: "tests/fixtures/test.mp3", Suffix: "mp3", BitRate: 128, Duration: 257.0}, {ID: "123", Path: "tests/fixtures/test.mp3", Suffix: "mp3", BitRate: 128, Duration: 257.0},
}) })
testCache := GetTranscodingCache() testCache := GetTranscodingCache()
Eventually(func() bool { return testCache.Ready(context.TODO()) }).Should(BeTrue()) Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
}) })
AfterEach(func() { AfterEach(func() {
_ = os.RemoveAll(conf.Server.DataFolder) _ = os.RemoveAll(conf.Server.DataFolder)

View File

@ -30,7 +30,7 @@ var _ = Describe("MediaStreamer", func() {
{ID: "123", Path: "tests/fixtures/test.mp3", Suffix: "mp3", BitRate: 128, Duration: 257.0}, {ID: "123", Path: "tests/fixtures/test.mp3", Suffix: "mp3", BitRate: 128, Duration: 257.0},
}) })
testCache := core.GetTranscodingCache() testCache := core.GetTranscodingCache()
Eventually(func() bool { return testCache.Ready(context.TODO()) }).Should(BeTrue()) Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue())
streamer = core.NewMediaStreamer(ds, ffmpeg, testCache) streamer = core.NewMediaStreamer(ds, ffmpeg, testCache)
}) })
AfterEach(func() { AfterEach(func() {

View File

@ -14,6 +14,7 @@ import (
"github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/consts" "github.com/navidrome/navidrome/consts"
"github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/utils"
) )
type Item interface { type Item interface {
@ -24,7 +25,6 @@ type ReadFunc func(ctx context.Context, item Item) (io.Reader, error)
type FileCache interface { type FileCache interface {
Get(ctx context.Context, item Item) (*CachedStream, error) Get(ctx context.Context, item Item) (*CachedStream, error)
Ready(ctx context.Context) bool
Available(ctx context.Context) bool Available(ctx context.Context) bool
} }
@ -46,7 +46,7 @@ func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader R
fc.cache = cache fc.cache = cache
fc.disabled = cache == nil || err != nil fc.disabled = cache == nil || err != nil
log.Info("Finished initializing cache", "cache", fc.name, "maxSize", fc.cacheSize, "elapsedTime", time.Since(start)) log.Info("Finished initializing cache", "cache", fc.name, "maxSize", fc.cacheSize, "elapsedTime", time.Since(start))
fc.ready = true fc.ready.Set(true)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("Cache %s will be DISABLED due to previous errors", "name"), fc.name, err) log.Error(fmt.Sprintf("Cache %s will be DISABLED due to previous errors", "name"), fc.name, err)
} }
@ -66,29 +66,20 @@ type fileCache struct {
cache fscache.Cache cache fscache.Cache
getReader ReadFunc getReader ReadFunc
disabled bool disabled bool
ready bool ready utils.AtomicBool
mutex *sync.RWMutex mutex *sync.RWMutex
} }
func (fc *fileCache) Ready(_ context.Context) bool { func (fc *fileCache) Available(_ context.Context) bool {
fc.mutex.RLock()
defer fc.mutex.RUnlock()
return fc.ready
}
func (fc *fileCache) Available(ctx context.Context) bool {
fc.mutex.RLock() fc.mutex.RLock()
defer fc.mutex.RUnlock() defer fc.mutex.RUnlock()
if !fc.ready { return fc.ready.Get() && !fc.disabled
log.Debug(ctx, "Cache not initialized yet", "cache", fc.name)
}
return fc.ready && !fc.disabled
} }
func (fc *fileCache) invalidate(ctx context.Context, key string) error { func (fc *fileCache) invalidate(ctx context.Context, key string) error {
if !fc.Available(ctx) { if !fc.Available(ctx) {
log.Debug(ctx, "Cache not initialized yet. Cannot invalidate key", "cache", fc.name, "key", key)
return nil return nil
} }
if !fc.cache.Exists(key) { if !fc.cache.Exists(key) {
@ -99,6 +90,7 @@ func (fc *fileCache) invalidate(ctx context.Context, key string) error {
func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) { func (fc *fileCache) Get(ctx context.Context, arg Item) (*CachedStream, error) {
if !fc.Available(ctx) { if !fc.Available(ctx) {
log.Debug(ctx, "Cache not initialized yet. Reading data directly from reader", "cache", fc.name)
reader, err := fc.getReader(ctx, arg) reader, err := fc.getReader(ctx, arg)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -16,9 +16,9 @@ import (
// Call NewFileCache and wait for it to be ready // Call NewFileCache and wait for it to be ready
func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache { func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader) fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader).(*fileCache)
Eventually(func() bool { return fc.Ready(context.Background()) }).Should(BeTrue()) Eventually(func() bool { return fc.ready.Get() }).Should(BeTrue())
return fc.(*fileCache) return fc
} }
var _ = Describe("File Caches", func() { var _ = Describe("File Caches", func() {