diff --git a/core/external_metadata.go b/core/external_metadata.go index 17361c94d..50310ed73 100644 --- a/core/external_metadata.go +++ b/core/external_metadata.go @@ -25,6 +25,9 @@ import ( const ( unavailableArtistID = "-1" maxSimilarArtists = 100 + refreshDelay = 5 * time.Second + refreshTimeout = 15 * time.Second + refreshQueueLength = 2000 ) type ExternalMetadata interface { @@ -37,8 +40,10 @@ type ExternalMetadata interface { } type externalMetadata struct { - ds model.DataStore - ag *agents.Agents + ds model.DataStore + ag *agents.Agents + artistQueue chan<- *auxArtist + albumQueue chan<- *auxAlbum } type auxAlbum struct { @@ -52,7 +57,10 @@ type auxArtist struct { } func NewExternalMetadata(ds model.DataStore, agents *agents.Agents) ExternalMetadata { - return &externalMetadata{ds: ds, ag: agents} + e := &externalMetadata{ds: ds, ag: agents} + e.artistQueue = startRefreshQueue(context.TODO(), e.populateArtistInfo) + e.albumQueue = startRefreshQueue(context.TODO(), e.populateAlbumInfo) + return e } func (e *externalMetadata) getAlbum(ctx context.Context, id string) (*auxAlbum, error) { @@ -84,7 +92,7 @@ func (e *externalMetadata) UpdateAlbumInfo(ctx context.Context, id string) (*mod if album.ExternalInfoUpdatedAt.IsZero() { log.Debug(ctx, "AlbumInfo not cached. Retrieving it now", "updatedAt", album.ExternalInfoUpdatedAt, "id", id, "name", album.Name) - err = e.refreshAlbumInfo(ctx, album) + err = e.populateAlbumInfo(ctx, album) if err != nil { return nil, err } @@ -92,25 +100,21 @@ func (e *externalMetadata) UpdateAlbumInfo(ctx context.Context, id string) (*mod if time.Since(album.ExternalInfoUpdatedAt) > conf.Server.DevAlbumInfoTimeToLive { log.Debug("Found expired cached AlbumInfo, refreshing in the background", "updatedAt", album.ExternalInfoUpdatedAt, "name", album.Name) - go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - err := e.refreshAlbumInfo(ctx, album) - if err != nil { - log.Error("Error refreshing AlbumInfo", "id", id, "name", album.Name, err) - } - }() + enqueueRefresh(e.albumQueue, album) } return &album.Album, nil } -func (e *externalMetadata) refreshAlbumInfo(ctx context.Context, album *auxAlbum) error { +func (e *externalMetadata) populateAlbumInfo(ctx context.Context, album *auxAlbum) error { + start := time.Now() info, err := e.ag.GetAlbumInfo(ctx, album.Name, album.AlbumArtist, album.MbzAlbumID) if errors.Is(err, agents.ErrNotFound) { return nil } if err != nil { + log.Error("Error refreshing AlbumInfo", "id", album.ID, "name", album.Name, "artist", album.AlbumArtist, + "elapsed", time.Since(start), err) return err } @@ -139,10 +143,12 @@ func (e *externalMetadata) refreshAlbumInfo(ctx context.Context, album *auxAlbum err = e.ds.Album(ctx).Put(&album.Album) if err != nil { - log.Error(ctx, "Error trying to update album external information", "id", album.ID, "name", album.Name, err) + log.Error(ctx, "Error trying to update album external information", "id", album.ID, "name", album.Name, + "elapsed", time.Since(start), err) + } else { + log.Trace(ctx, "AlbumInfo collected", "album", album, "elapsed", time.Since(start)) } - log.Trace(ctx, "AlbumInfo collected", "album", album) return nil } @@ -207,19 +213,13 @@ func (e *externalMetadata) refreshArtistInfo(ctx context.Context, id string) (*a // If info is expired, trigger a populateArtistInfo in the background if time.Since(artist.ExternalInfoUpdatedAt) > conf.Server.DevArtistInfoTimeToLive { log.Debug("Found expired cached ArtistInfo, refreshing in the background", "updatedAt", artist.ExternalInfoUpdatedAt, "name", artist.Name) - go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - err := e.populateArtistInfo(ctx, artist) - if err != nil { - log.Error("Error refreshing ArtistInfo", "id", id, "name", artist.Name, err) - } - }() + enqueueRefresh(e.artistQueue, artist) } return artist, nil } func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist *auxArtist) error { + start := time.Now() // Get MBID first, if it is not yet available if artist.MbzArtistID == "" { mbid, err := e.ag.GetArtistMBID(ctx, artist.ID, artist.Name) @@ -237,17 +237,18 @@ func (e *externalMetadata) populateArtistInfo(ctx context.Context, artist *auxAr }) if utils.IsCtxDone(ctx) { - log.Warn(ctx, "ArtistInfo update canceled", ctx.Err()) + log.Warn(ctx, "ArtistInfo update canceled", "elapsed", "id", artist.ID, "name", artist.Name, time.Since(start), ctx.Err()) return ctx.Err() } artist.ExternalInfoUpdatedAt = time.Now() err := e.ds.Artist(ctx).Put(&artist.Artist) if err != nil { - log.Error(ctx, "Error trying to update artist external information", "id", artist.ID, "name", artist.Name, err) + log.Error(ctx, "Error trying to update artist external information", "id", artist.ID, "name", artist.Name, + "elapsed", time.Since(start), err) + } else { + log.Trace(ctx, "ArtistInfo collected", "artist", artist, "elapsed", time.Since(start)) } - - log.Trace(ctx, "ArtistInfo collected", "artist", artist) return nil } @@ -434,11 +435,11 @@ func (e *externalMetadata) findMatchingTrack(ctx context.Context, mbid string, a } func (e *externalMetadata) callGetURL(ctx context.Context, agent agents.ArtistURLRetriever, artist *auxArtist) { - url, err := agent.GetArtistURL(ctx, artist.ID, artist.Name, artist.MbzArtistID) - if url == "" || err != nil { + artisURL, err := agent.GetArtistURL(ctx, artist.ID, artist.Name, artist.MbzArtistID) + if artisURL == "" || err != nil { return } - artist.ExternalUrl = url + artist.ExternalUrl = artisURL } func (e *externalMetadata) callGetBiography(ctx context.Context, agent agents.ArtistBiographyRetriever, artist *auxArtist) { @@ -568,3 +569,29 @@ func (e *externalMetadata) loadSimilar(ctx context.Context, artist *auxArtist, c artist.SimilarArtists = loaded return nil } + +func startRefreshQueue[T any](ctx context.Context, processFn func(context.Context, T) error) chan<- T { + queue := make(chan T, refreshQueueLength) + go func() { + for { + time.Sleep(refreshDelay) + ctx, cancel := context.WithTimeout(ctx, refreshTimeout) + select { + case a := <-queue: + _ = processFn(ctx, a) + cancel() + case <-ctx.Done(): + cancel() + break + } + } + }() + return queue +} + +func enqueueRefresh[T any](queue chan<- T, item T) { + select { + case queue <- item: + default: // It is ok to miss a refresh + } +}