From 289da56f64021e9d523966fe807893e0576c81fd Mon Sep 17 00:00:00 2001
From: Deluan <deluan@navidrome.org>
Date: Thu, 24 Jun 2021 00:01:05 -0400
Subject: [PATCH] Implement Scrobble buffering/retrying

---
 cmd/root.go                                   |   2 +-
 core/agents/lastfm/agent.go                   |  59 +++++----
 core/agents/lastfm/agent_test.go              |  56 ++++++++-
 core/scrobbler/buffered_scrobbler.go          | 115 ++++++++++++++++++
 core/scrobbler/interfaces.go                  |   9 +-
 core/scrobbler/play_tracker.go                |  57 ++++-----
 core/scrobbler/play_tracker_test.go           |  51 ++++----
 .../20210616150710_encrypt_all_passwords.go   |   1 -
 ...add_user_prefs_player_scrobbler_enabled.go |   1 -
 .../20210626213026_add_scrobble_buffer.go     |  38 ++++++
 model/datastore.go                            |   1 +
 model/scrobble_buffer.go                      |  21 ++++
 persistence/persistence.go                    |   4 +
 persistence/scrobble_buffer_repository.go     |  83 +++++++++++++
 persistence/sql_base_repository.go            |   2 +-
 tests/mock_persistence.go                     |  28 +++--
 tests/mock_scrobble_buffer_repo.go            |  81 ++++++++++++
 17 files changed, 513 insertions(+), 96 deletions(-)
 create mode 100644 core/scrobbler/buffered_scrobbler.go
 create mode 100644 db/migration/20210626213026_add_scrobble_buffer.go
 create mode 100644 model/scrobble_buffer.go
 create mode 100644 persistence/scrobble_buffer_repository.go
 create mode 100644 tests/mock_scrobble_buffer_repo.go

diff --git a/cmd/root.go b/cmd/root.go
index e0290f688..8b8c1b2a9 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -74,8 +74,8 @@ func runNavidrome() {
 func startServer() (func() error, func(err error)) {
 	return func() error {
 			a := CreateServer(conf.Server.MusicFolder)
-			a.MountRouter("Subsonic API", consts.URLPathSubsonicAPI, CreateSubsonicAPIRouter())
 			a.MountRouter("Native API", consts.URLPathNativeAPI, CreateNativeAPIRouter())
+			a.MountRouter("Subsonic API", consts.URLPathSubsonicAPI, CreateSubsonicAPIRouter())
 			if conf.Server.DevEnableScrobble {
 				a.MountRouter("LastFM Auth", consts.URLPathNativeAPI+"/lastfm", CreateLastFMRouter())
 			}
diff --git a/core/agents/lastfm/agent.go b/core/agents/lastfm/agent.go
index 7b823e330..7095be5c8 100644
--- a/core/agents/lastfm/agent.go
+++ b/core/agents/lastfm/agent.go
@@ -160,9 +160,10 @@ func (l *lastfmAgent) callArtistGetTopTracks(ctx context.Context, artistName, mb
 
 func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error {
 	sk, err := l.sessionKeys.get(ctx, userId)
-	if err != nil {
-		return err
+	if err != nil || sk == "" {
+		return scrobbler.ErrNotAuthorized
 	}
+
 	err = l.client.UpdateNowPlaying(ctx, sk, ScrobbleInfo{
 		artist:      track.Artist,
 		track:       track.Title,
@@ -173,38 +174,44 @@ func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *mode
 		albumArtist: track.AlbumArtist,
 	})
 	if err != nil {
-		return err
+		log.Warn(ctx, "Last.fm client.updateNowPlaying returned error", "track", track.Title, err)
+		return scrobbler.ErrUnrecoverable
 	}
 	return nil
 }
 
-func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, scrobbles []scrobbler.Scrobble) error {
+func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, s scrobbler.Scrobble) error {
 	sk, err := l.sessionKeys.get(ctx, userId)
-	if err != nil {
-		return err
+	if err != nil || sk == "" {
+		return scrobbler.ErrNotAuthorized
 	}
 
-	// TODO Implement batch scrobbling
-	for _, s := range scrobbles {
-		if s.Duration <= 30 {
-			log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration)
-			continue
-		}
-		err = l.client.Scrobble(ctx, sk, ScrobbleInfo{
-			artist:      s.Artist,
-			track:       s.Title,
-			album:       s.Album,
-			trackNumber: s.TrackNumber,
-			mbid:        s.MbzTrackID,
-			duration:    int(s.Duration),
-			albumArtist: s.AlbumArtist,
-			timestamp:   s.TimeStamp,
-		})
-		if err != nil {
-			return err
-		}
+	if s.Duration <= 30 {
+		log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration)
+		return nil
 	}
-	return nil
+	err = l.client.Scrobble(ctx, sk, ScrobbleInfo{
+		artist:      s.Artist,
+		track:       s.Title,
+		album:       s.Album,
+		trackNumber: s.TrackNumber,
+		mbid:        s.MbzTrackID,
+		duration:    int(s.Duration),
+		albumArtist: s.AlbumArtist,
+		timestamp:   s.TimeStamp,
+	})
+	if err == nil {
+		return nil
+	}
+	lfErr, isLastFMError := err.(*lastFMError)
+	if !isLastFMError {
+		log.Warn(ctx, "Last.fm client.scrobble returned error", "track", s.Title, err)
+		return scrobbler.ErrRetryLater
+	}
+	if lfErr.Code == 11 || lfErr.Code == 16 {
+		return scrobbler.ErrRetryLater
+	}
+	return scrobbler.ErrUnrecoverable
 }
 
 func (l *lastfmAgent) IsAuthorized(ctx context.Context, userId string) bool {
diff --git a/core/agents/lastfm/agent_test.go b/core/agents/lastfm/agent_test.go
index c636359bd..f8ea2b370 100644
--- a/core/agents/lastfm/agent_test.go
+++ b/core/agents/lastfm/agent_test.go
@@ -264,15 +264,19 @@ var _ = Describe("lastfmAgent", func() {
 				Expect(sentParams.Get("duration")).To(Equal(strconv.FormatFloat(float64(track.Duration), 'G', -1, 32)))
 				Expect(sentParams.Get("mbid")).To(Equal(track.MbzTrackID))
 			})
+
+			It("returns ErrNotAuthorized if user is not linked", func() {
+				err := agent.NowPlaying(ctx, "user-2", track)
+				Expect(err).To(MatchError(scrobbler.ErrNotAuthorized))
+			})
 		})
 
 		Describe("Scrobble", func() {
 			It("calls Last.fm with correct params", func() {
 				ts := time.Now()
-				scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: ts}}
 				httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200}
 
-				err := agent.Scrobble(ctx, "user-1", scrobbles)
+				err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: ts})
 
 				Expect(err).ToNot(HaveOccurred())
 				Expect(httpClient.SavedRequest.Method).To(Equal(http.MethodPost))
@@ -291,14 +295,58 @@ var _ = Describe("lastfmAgent", func() {
 
 			It("skips songs with less than 31 seconds", func() {
 				track.Duration = 29
-				scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: time.Now()}}
 				httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200}
 
-				err := agent.Scrobble(ctx, "user-1", scrobbles)
+				err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
 
 				Expect(err).ToNot(HaveOccurred())
 				Expect(httpClient.SavedRequest).To(BeNil())
 			})
+
+			It("returns ErrNotAuthorized if user is not linked", func() {
+				err := agent.Scrobble(ctx, "user-2", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
+				Expect(err).To(MatchError(scrobbler.ErrNotAuthorized))
+			})
+
+			It("returns ErrRetryLater on error 11", func() {
+				httpClient.Res = http.Response{
+					Body:       ioutil.NopCloser(bytes.NewBufferString(`{"error":11,"message":"Service Offline - This service is temporarily offline. Try again later."}`)),
+					StatusCode: 400,
+				}
+
+				err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
+				Expect(err).To(MatchError(scrobbler.ErrRetryLater))
+			})
+
+			It("returns ErrRetryLater on error 16", func() {
+				httpClient.Res = http.Response{
+					Body:       ioutil.NopCloser(bytes.NewBufferString(`{"error":16,"message":"There was a temporary error processing your request. Please try again"}`)),
+					StatusCode: 400,
+				}
+
+				err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
+				Expect(err).To(MatchError(scrobbler.ErrRetryLater))
+			})
+
+			It("returns ErrRetryLater on http errors", func() {
+				httpClient.Res = http.Response{
+					Body:       ioutil.NopCloser(bytes.NewBufferString(`internal server error`)),
+					StatusCode: 500,
+				}
+
+				err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
+				Expect(err).To(MatchError(scrobbler.ErrRetryLater))
+			})
+
+			It("returns ErrUnrecoverable on other errors", func() {
+				httpClient.Res = http.Response{
+					Body:       ioutil.NopCloser(bytes.NewBufferString(`{"error":8,"message":"Operation failed - Something else went wrong"}`)),
+					StatusCode: 400,
+				}
+
+				err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
+				Expect(err).To(MatchError(scrobbler.ErrUnrecoverable))
+			})
 		})
 	})
 
diff --git a/core/scrobbler/buffered_scrobbler.go b/core/scrobbler/buffered_scrobbler.go
new file mode 100644
index 000000000..1d98293e4
--- /dev/null
+++ b/core/scrobbler/buffered_scrobbler.go
@@ -0,0 +1,115 @@
+package scrobbler
+
+import (
+	"context"
+	"time"
+
+	"github.com/navidrome/navidrome/log"
+	"github.com/navidrome/navidrome/model"
+)
+
+func NewBufferedScrobbler(ds model.DataStore, s Scrobbler, service string) *bufferedScrobbler {
+	b := &bufferedScrobbler{ds: ds, wrapped: s, service: service}
+	b.wakeSignal = make(chan struct{}, 1)
+	go b.run()
+	return b
+}
+
+type bufferedScrobbler struct {
+	ds         model.DataStore
+	wrapped    Scrobbler
+	service    string
+	wakeSignal chan struct{}
+}
+
+func (b *bufferedScrobbler) IsAuthorized(ctx context.Context, userId string) bool {
+	return b.wrapped.IsAuthorized(ctx, userId)
+}
+
+func (b *bufferedScrobbler) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error {
+	return b.wrapped.NowPlaying(ctx, userId, track)
+}
+
+func (b *bufferedScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error {
+	err := b.ds.ScrobbleBuffer(ctx).Enqueue(b.service, userId, s.ID, s.TimeStamp)
+	if err != nil {
+		return err
+	}
+
+	b.sendWakeSignal()
+	return nil
+}
+
+func (b *bufferedScrobbler) sendWakeSignal() {
+	// Don't block if the previous signal was not read yet
+	select {
+	case b.wakeSignal <- struct{}{}:
+	default:
+	}
+}
+
+func (b *bufferedScrobbler) run() {
+	ctx := context.Background()
+	for {
+		if !b.processQueue(ctx) {
+			time.AfterFunc(5*time.Second, func() {
+				b.sendWakeSignal()
+			})
+		}
+		<-b.wakeSignal
+	}
+}
+
+func (b *bufferedScrobbler) processQueue(ctx context.Context) bool {
+	buffer := b.ds.ScrobbleBuffer(ctx)
+	userIds, err := buffer.UserIDs(b.service)
+	if err != nil {
+		log.Error(ctx, "Error retrieving userIds from scrobble buffer", "scrobbler", b.service, err)
+		return false
+	}
+	result := true
+	for _, userId := range userIds {
+		if !b.processUserQueue(ctx, userId) {
+			result = false
+		}
+	}
+	return result
+}
+
+func (b *bufferedScrobbler) processUserQueue(ctx context.Context, userId string) bool {
+	buffer := b.ds.ScrobbleBuffer(ctx)
+	for {
+		entry, err := buffer.Next(b.service, userId)
+		if err != nil {
+			log.Error(ctx, "Error reading from scrobble buffer", "scrobbler", b.service, err)
+			return false
+		}
+		if entry == nil {
+			return true
+		}
+		log.Debug(ctx, "Sending scrobble", "scrobbler", b.service, "track", entry.Title, "artist", entry.Artist)
+		err = b.wrapped.Scrobble(ctx, entry.UserID, Scrobble{
+			MediaFile: entry.MediaFile,
+			TimeStamp: entry.PlayTime,
+		})
+		if err != nil {
+			switch err {
+			case ErrRetryLater:
+				log.Warn(ctx, "Could not send scrobble. Will be retried", "userId", entry.UserID,
+					"track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err)
+				return false
+			default:
+				log.Error(ctx, "Error sending scrobble to service. Discarding", "scrobbler", b.service,
+					"userId", entry.UserID, "artist", entry.Artist, "track", entry.Title, err)
+			}
+		}
+		err = buffer.Dequeue(entry)
+		if err != nil {
+			log.Error(ctx, "Error removing entry from scrobble buffer", "userId", entry.UserID,
+				"track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err)
+			return false
+		}
+	}
+}
+
+var _ Scrobbler = (*bufferedScrobbler)(nil)
diff --git a/core/scrobbler/interfaces.go b/core/scrobbler/interfaces.go
index 158931c89..90141f112 100644
--- a/core/scrobbler/interfaces.go
+++ b/core/scrobbler/interfaces.go
@@ -2,6 +2,7 @@ package scrobbler
 
 import (
 	"context"
+	"errors"
 	"time"
 
 	"github.com/navidrome/navidrome/model"
@@ -12,10 +13,16 @@ type Scrobble struct {
 	TimeStamp time.Time
 }
 
+var (
+	ErrNotAuthorized = errors.New("not authorized")
+	ErrRetryLater    = errors.New("retry later")
+	ErrUnrecoverable = errors.New("unrecoverable")
+)
+
 type Scrobbler interface {
 	IsAuthorized(ctx context.Context, userId string) bool
 	NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error
-	Scrobble(ctx context.Context, userId string, scrobbles []Scrobble) error
+	Scrobble(ctx context.Context, userId string, s Scrobble) error
 }
 
 type Constructor func(ds model.DataStore) Scrobbler
diff --git a/core/scrobbler/play_tracker.go b/core/scrobbler/play_tracker.go
index c3aaff087..7824c040b 100644
--- a/core/scrobbler/play_tracker.go
+++ b/core/scrobbler/play_tracker.go
@@ -39,9 +39,10 @@ type PlayTracker interface {
 }
 
 type playTracker struct {
-	ds      model.DataStore
-	broker  events.Broker
-	playMap *ttlcache.Cache
+	ds         model.DataStore
+	broker     events.Broker
+	playMap    *ttlcache.Cache
+	scrobblers map[string]Scrobbler
 }
 
 func GetPlayTracker(ds model.DataStore, broker events.Broker) PlayTracker {
@@ -49,7 +50,14 @@ func GetPlayTracker(ds model.DataStore, broker events.Broker) PlayTracker {
 		m := ttlcache.NewCache()
 		m.SkipTTLExtensionOnHit(true)
 		_ = m.SetTTL(nowPlayingExpire)
-		return &playTracker{ds: ds, playMap: m, broker: broker}
+		p := &playTracker{ds: ds, playMap: m, broker: broker}
+		p.scrobblers = make(map[string]Scrobbler)
+		for name, constructor := range constructors {
+			s := constructor(ds)
+			s = NewBufferedScrobbler(ds, s, name)
+			p.scrobblers[name] = s
+		}
+		return p
 	})
 	return instance.(*playTracker)
 }
@@ -78,15 +86,12 @@ func (p *playTracker) dispatchNowPlaying(ctx context.Context, userId string, tra
 		return
 	}
 	// TODO Parallelize
-	for name, constructor := range scrobblers {
-		err := func() error {
-			s := constructor(p.ds)
-			if !s.IsAuthorized(ctx, userId) {
-				return nil
-			}
-			log.Debug(ctx, "Sending NowPlaying info", "scrobbler", name, "track", t.Title, "artist", t.Artist)
-			return s.NowPlaying(ctx, userId, t)
-		}()
+	for name, s := range p.scrobblers {
+		if !s.IsAuthorized(ctx, userId) {
+			continue
+		}
+		log.Debug(ctx, "Sending NowPlaying info", "scrobbler", name, "track", t.Title, "artist", t.Artist)
+		err := s.NowPlaying(ctx, userId, t)
 		if err != nil {
 			log.Error(ctx, "Error sending NowPlayingInfo", "scrobbler", name, "track", t.Title, "artist", t.Artist, err)
 			return
@@ -161,17 +166,13 @@ func (p *playTracker) incPlay(ctx context.Context, track *model.MediaFile, times
 
 func (p *playTracker) dispatchScrobble(ctx context.Context, t *model.MediaFile, playTime time.Time) error {
 	u, _ := request.UserFrom(ctx)
-	scrobbles := []Scrobble{{MediaFile: *t, TimeStamp: playTime}}
-	// TODO Parallelize
-	for name, constructor := range scrobblers {
-		err := func() error {
-			s := constructor(p.ds)
-			if !s.IsAuthorized(ctx, u.ID) {
-				return nil
-			}
-			log.Debug(ctx, "Sending Scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist)
-			return s.Scrobble(ctx, u.ID, scrobbles)
-		}()
+	scrobble := Scrobble{MediaFile: *t, TimeStamp: playTime}
+	for name, s := range p.scrobblers {
+		if !s.IsAuthorized(ctx, u.ID) {
+			continue
+		}
+		log.Debug(ctx, "Buffering scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist)
+		err := s.Scrobble(ctx, u.ID, scrobble)
 		if err != nil {
 			log.Error(ctx, "Error sending Scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist, err)
 			return err
@@ -180,14 +181,14 @@ func (p *playTracker) dispatchScrobble(ctx context.Context, t *model.MediaFile,
 	return nil
 }
 
-var scrobblers map[string]Constructor
+var constructors map[string]Constructor
 
 func Register(name string, init Constructor) {
 	if !conf.Server.DevEnableScrobble {
 		return
 	}
-	if scrobblers == nil {
-		scrobblers = make(map[string]Constructor)
+	if constructors == nil {
+		constructors = make(map[string]Constructor)
 	}
-	scrobblers[name] = init
+	constructors[name] = init
 }
diff --git a/core/scrobbler/play_tracker_test.go b/core/scrobbler/play_tracker_test.go
index 76220c424..1d28f65a2 100644
--- a/core/scrobbler/play_tracker_test.go
+++ b/core/scrobbler/play_tracker_test.go
@@ -6,11 +6,9 @@ import (
 	"time"
 
 	"github.com/navidrome/navidrome/conf"
-
-	"github.com/navidrome/navidrome/server/events"
-
 	"github.com/navidrome/navidrome/model"
 	"github.com/navidrome/navidrome/model/request"
+	"github.com/navidrome/navidrome/server/events"
 	"github.com/navidrome/navidrome/tests"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
@@ -19,11 +17,11 @@ import (
 var _ = Describe("PlayTracker", func() {
 	var ctx context.Context
 	var ds model.DataStore
-	var broker PlayTracker
+	var tracker PlayTracker
 	var track model.MediaFile
 	var album model.Album
 	var artist model.Artist
-	var fake *fakeScrobbler
+	var fake fakeScrobbler
 
 	BeforeEach(func() {
 		conf.Server.DevEnableScrobble = true
@@ -31,11 +29,18 @@ var _ = Describe("PlayTracker", func() {
 		ctx = request.WithUser(ctx, model.User{ID: "u-1"})
 		ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: true})
 		ds = &tests.MockDataStore{}
-		broker = GetPlayTracker(ds, events.GetBroker())
-		fake = &fakeScrobbler{Authorized: true}
+		fake = fakeScrobbler{Authorized: true}
 		Register("fake", func(ds model.DataStore) Scrobbler {
-			return fake
+			return &fake
 		})
+		tracker = GetPlayTracker(ds, events.GetBroker())
+
+		// Remove buffering to simplify tests
+		for i, s := range tracker.(*playTracker).scrobblers {
+			if bs, ok := s.(*bufferedScrobbler); ok {
+				tracker.(*playTracker).scrobblers[i] = bs.wrapped
+			}
+		}
 
 		track = model.MediaFile{
 			ID:          "123",
@@ -58,7 +63,7 @@ var _ = Describe("PlayTracker", func() {
 
 	Describe("NowPlaying", func() {
 		It("sends track to agent", func() {
-			err := broker.NowPlaying(ctx, "player-1", "player-one", "123")
+			err := tracker.NowPlaying(ctx, "player-1", "player-one", "123")
 			Expect(err).ToNot(HaveOccurred())
 			Expect(fake.NowPlayingCalled).To(BeTrue())
 			Expect(fake.UserID).To(Equal("u-1"))
@@ -67,7 +72,7 @@ var _ = Describe("PlayTracker", func() {
 		It("does not send track to agent if user has not authorized", func() {
 			fake.Authorized = false
 
-			err := broker.NowPlaying(ctx, "player-1", "player-one", "123")
+			err := tracker.NowPlaying(ctx, "player-1", "player-one", "123")
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(fake.NowPlayingCalled).To(BeFalse())
@@ -75,7 +80,7 @@ var _ = Describe("PlayTracker", func() {
 		It("does not send track to agent if player is not enabled to send scrobbles", func() {
 			ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false})
 
-			err := broker.NowPlaying(ctx, "player-1", "player-one", "123")
+			err := tracker.NowPlaying(ctx, "player-1", "player-one", "123")
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(fake.NowPlayingCalled).To(BeFalse())
@@ -91,11 +96,11 @@ var _ = Describe("PlayTracker", func() {
 			track2.ID = "456"
 			_ = ds.MediaFile(ctx).Put(&track)
 			ctx = request.WithUser(ctx, model.User{UserName: "user-1"})
-			_ = broker.NowPlaying(ctx, "player-1", "player-one", "123")
+			_ = tracker.NowPlaying(ctx, "player-1", "player-one", "123")
 			ctx = request.WithUser(ctx, model.User{UserName: "user-2"})
-			_ = broker.NowPlaying(ctx, "player-2", "player-two", "456")
+			_ = tracker.NowPlaying(ctx, "player-2", "player-two", "456")
 
-			playing, err := broker.GetNowPlaying(ctx)
+			playing, err := tracker.GetNowPlaying(ctx)
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(playing).To(HaveLen(2))
@@ -116,19 +121,19 @@ var _ = Describe("PlayTracker", func() {
 			ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"})
 			ts := time.Now()
 
-			err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
+			err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(fake.ScrobbleCalled).To(BeTrue())
 			Expect(fake.UserID).To(Equal("u-1"))
-			Expect(fake.Scrobbles[0].ID).To(Equal("123"))
+			Expect(fake.LastScrobble.ID).To(Equal("123"))
 		})
 
 		It("increments play counts in the DB", func() {
 			ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"})
 			ts := time.Now()
 
-			err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
+			err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}})
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(track.PlayCount).To(Equal(int64(1)))
@@ -139,7 +144,7 @@ var _ = Describe("PlayTracker", func() {
 		It("does not send track to agent if user has not authorized", func() {
 			fake.Authorized = false
 
-			err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
+			err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(fake.ScrobbleCalled).To(BeFalse())
@@ -148,7 +153,7 @@ var _ = Describe("PlayTracker", func() {
 		It("does not send track to agent player is not enabled to send scrobbles", func() {
 			ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false})
 
-			err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
+			err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(fake.ScrobbleCalled).To(BeFalse())
@@ -157,7 +162,7 @@ var _ = Describe("PlayTracker", func() {
 		It("increments play counts even if it cannot scrobble", func() {
 			fake.Error = errors.New("error")
 
-			err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
+			err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}})
 
 			Expect(err).ToNot(HaveOccurred())
 			Expect(fake.ScrobbleCalled).To(BeFalse())
@@ -177,7 +182,7 @@ type fakeScrobbler struct {
 	ScrobbleCalled   bool
 	UserID           string
 	Track            *model.MediaFile
-	Scrobbles        []Scrobble
+	LastScrobble     Scrobble
 	Error            error
 }
 
@@ -195,12 +200,12 @@ func (f *fakeScrobbler) NowPlaying(ctx context.Context, userId string, track *mo
 	return nil
 }
 
-func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, scrobbles []Scrobble) error {
+func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error {
 	f.ScrobbleCalled = true
 	if f.Error != nil {
 		return f.Error
 	}
 	f.UserID = userId
-	f.Scrobbles = scrobbles
+	f.LastScrobble = s
 	return nil
 }
diff --git a/db/migration/20210616150710_encrypt_all_passwords.go b/db/migration/20210616150710_encrypt_all_passwords.go
index afa7f39fe..d9a2be309 100644
--- a/db/migration/20210616150710_encrypt_all_passwords.go
+++ b/db/migration/20210616150710_encrypt_all_passwords.go
@@ -52,6 +52,5 @@ func upEncodeAllPasswords(tx *sql.Tx) error {
 }
 
 func downEncodeAllPasswords(tx *sql.Tx) error {
-	// This code is executed when the migration is rolled back.
 	return nil
 }
diff --git a/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go b/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go
index a95083eea..da63ecd7d 100644
--- a/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go
+++ b/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go
@@ -40,6 +40,5 @@ alter table player add scrobble_enabled bool default true;
 }
 
 func downAddUserPrefsPlayerScrobblerEnabled(tx *sql.Tx) error {
-	// This code is executed when the migration is rolled back.
 	return nil
 }
diff --git a/db/migration/20210626213026_add_scrobble_buffer.go b/db/migration/20210626213026_add_scrobble_buffer.go
new file mode 100644
index 000000000..4fe749686
--- /dev/null
+++ b/db/migration/20210626213026_add_scrobble_buffer.go
@@ -0,0 +1,38 @@
+package migrations
+
+import (
+	"database/sql"
+
+	"github.com/pressly/goose"
+)
+
+func init() {
+	goose.AddMigration(upAddScrobbleBuffer, downAddScrobbleBuffer)
+}
+
+func upAddScrobbleBuffer(tx *sql.Tx) error {
+	_, err := tx.Exec(`
+create table if not exists scrobble_buffer
+(
+	user_id varchar not null
+	constraint scrobble_buffer_user_id_fk
+		references user
+			on update cascade on delete cascade,
+	service varchar not null,
+	media_file_id varchar not null
+		constraint scrobble_buffer_media_file_id_fk
+			references media_file
+				on update cascade on delete cascade,
+	play_time datetime not null,
+	enqueue_time datetime not null default current_timestamp,
+	constraint scrobble_buffer_pk
+		unique (user_id, service, media_file_id, play_time, user_id)
+);
+`)
+
+	return err
+}
+
+func downAddScrobbleBuffer(tx *sql.Tx) error {
+	return nil
+}
diff --git a/model/datastore.go b/model/datastore.go
index 19b7f92e5..19a08c05a 100644
--- a/model/datastore.go
+++ b/model/datastore.go
@@ -33,6 +33,7 @@ type DataStore interface {
 	Property(ctx context.Context) PropertyRepository
 	User(ctx context.Context) UserRepository
 	UserProps(ctx context.Context) UserPropsRepository
+	ScrobbleBuffer(ctx context.Context) ScrobbleBufferRepository
 
 	Resource(ctx context.Context, model interface{}) ResourceRepository
 
diff --git a/model/scrobble_buffer.go b/model/scrobble_buffer.go
new file mode 100644
index 000000000..e75936c4e
--- /dev/null
+++ b/model/scrobble_buffer.go
@@ -0,0 +1,21 @@
+package model
+
+import "time"
+
+type ScrobbleEntry struct {
+	MediaFile
+	Service     string
+	UserID      string `json:"user_id" orm:"column(user_id)"`
+	PlayTime    time.Time
+	EnqueueTime time.Time
+}
+
+type ScrobbleEntries []ScrobbleEntry
+
+type ScrobbleBufferRepository interface {
+	UserIDs(service string) ([]string, error)
+	Enqueue(service, userId, mediaFileId string, playTime time.Time) error
+	Next(service string, userId string) (*ScrobbleEntry, error)
+	Dequeue(entry *ScrobbleEntry) error
+	Length() (int64, error)
+}
diff --git a/persistence/persistence.go b/persistence/persistence.go
index 57bdcc055..8ad96896a 100644
--- a/persistence/persistence.go
+++ b/persistence/persistence.go
@@ -70,6 +70,10 @@ func (s *SQLStore) Player(ctx context.Context) model.PlayerRepository {
 	return NewPlayerRepository(ctx, s.getOrmer())
 }
 
+func (s *SQLStore) ScrobbleBuffer(ctx context.Context) model.ScrobbleBufferRepository {
+	return NewScrobbleBufferRepository(ctx, s.getOrmer())
+}
+
 func (s *SQLStore) Resource(ctx context.Context, m interface{}) model.ResourceRepository {
 	switch m.(type) {
 	case model.User:
diff --git a/persistence/scrobble_buffer_repository.go b/persistence/scrobble_buffer_repository.go
new file mode 100644
index 000000000..1e32cceed
--- /dev/null
+++ b/persistence/scrobble_buffer_repository.go
@@ -0,0 +1,83 @@
+package persistence
+
+import (
+	"context"
+	"time"
+
+	. "github.com/Masterminds/squirrel"
+	"github.com/astaxie/beego/orm"
+	"github.com/navidrome/navidrome/model"
+)
+
+type scrobbleBufferRepository struct {
+	sqlRepository
+}
+
+func NewScrobbleBufferRepository(ctx context.Context, o orm.Ormer) model.ScrobbleBufferRepository {
+	r := &scrobbleBufferRepository{}
+	r.ctx = ctx
+	r.ormer = o
+	r.tableName = "scrobble_buffer"
+	return r
+}
+
+func (r *scrobbleBufferRepository) UserIDs(service string) ([]string, error) {
+	sql := Select().Columns("user_id").
+		From(r.tableName).
+		Where(And{
+			Eq{"service": service},
+		}).
+		GroupBy("user_id").
+		OrderBy("count(*)")
+	var userIds []string
+	err := r.queryAll(sql, &userIds)
+	return userIds, err
+}
+
+func (r *scrobbleBufferRepository) Enqueue(service, userId, mediaFileId string, playTime time.Time) error {
+	ins := Insert(r.tableName).SetMap(map[string]interface{}{
+		"user_id":       userId,
+		"service":       service,
+		"media_file_id": mediaFileId,
+		"play_time":     playTime,
+		"enqueue_time":  time.Now(),
+	})
+	_, err := r.executeSQL(ins)
+	return err
+}
+
+func (r *scrobbleBufferRepository) Next(service string, userId string) (*model.ScrobbleEntry, error) {
+	sql := Select().Columns("s.*, m.*").
+		From(r.tableName+" s").
+		LeftJoin("media_file m on m.id = s.media_file_id").
+		Where(And{
+			Eq{"service": service},
+			Eq{"user_id": userId},
+		}).
+		OrderBy("play_time", "s.rowid").Limit(1)
+
+	res := model.ScrobbleEntries{}
+	// TODO Rewrite queryOne to use QueryRows, to workaround the recursive embedded structs issue
+	err := r.queryAll(sql, &res)
+	if err == model.ErrNotFound || len(res) == 0 {
+		return nil, nil
+	}
+	if err != nil {
+		return nil, err
+	}
+	return &res[0], nil
+}
+
+func (r *scrobbleBufferRepository) Dequeue(entry *model.ScrobbleEntry) error {
+	return r.delete(And{
+		Eq{"service": entry.Service},
+		Eq{"media_file_id": entry.MediaFile.ID},
+		Eq{"play_time": entry.PlayTime},
+	})
+}
+
+func (r *scrobbleBufferRepository) Length() (int64, error) {
+	return r.count(Select())
+}
+
+var _ model.ScrobbleBufferRepository = (*scrobbleBufferRepository)(nil)
diff --git a/persistence/sql_base_repository.go b/persistence/sql_base_repository.go
index fd89f9fd0..ef5986339 100644
--- a/persistence/sql_base_repository.go
+++ b/persistence/sql_base_repository.go
@@ -149,7 +149,7 @@ func (r sqlRepository) queryOne(sq Sqlizer, response interface{}) error {
 	start := time.Now()
 	err = r.ormer.Raw(query, args...).QueryRow(response)
 	if err == orm.ErrNoRows {
-		r.logSQL(query, args, nil, 1, start)
+		r.logSQL(query, args, nil, 0, start)
 		return model.ErrNotFound
 	}
 	r.logSQL(query, args, err, 1, start)
diff --git a/tests/mock_persistence.go b/tests/mock_persistence.go
index 7150a0ece..e4517602c 100644
--- a/tests/mock_persistence.go
+++ b/tests/mock_persistence.go
@@ -7,16 +7,17 @@ import (
 )
 
 type MockDataStore struct {
-	MockedGenre       model.GenreRepository
-	MockedAlbum       model.AlbumRepository
-	MockedArtist      model.ArtistRepository
-	MockedMediaFile   model.MediaFileRepository
-	MockedUser        model.UserRepository
-	MockedProperty    model.PropertyRepository
-	MockedPlayer      model.PlayerRepository
-	MockedShare       model.ShareRepository
-	MockedTranscoding model.TranscodingRepository
-	MockedUserProps   model.UserPropsRepository
+	MockedGenre          model.GenreRepository
+	MockedAlbum          model.AlbumRepository
+	MockedArtist         model.ArtistRepository
+	MockedMediaFile      model.MediaFileRepository
+	MockedUser           model.UserRepository
+	MockedProperty       model.PropertyRepository
+	MockedPlayer         model.PlayerRepository
+	MockedShare          model.ShareRepository
+	MockedTranscoding    model.TranscodingRepository
+	MockedUserProps      model.UserPropsRepository
+	MockedScrobbleBuffer model.ScrobbleBufferRepository
 }
 
 func (db *MockDataStore) Album(context.Context) model.AlbumRepository {
@@ -101,6 +102,13 @@ func (db *MockDataStore) Player(context.Context) model.PlayerRepository {
 	return struct{ model.PlayerRepository }{}
 }
 
+func (db *MockDataStore) ScrobbleBuffer(ctx context.Context) model.ScrobbleBufferRepository {
+	if db.MockedScrobbleBuffer == nil {
+		db.MockedScrobbleBuffer = CreateMockedScrobbleBufferRepo()
+	}
+	return db.MockedScrobbleBuffer
+}
+
 func (db *MockDataStore) WithTx(block func(db model.DataStore) error) error {
 	return block(db)
 }
diff --git a/tests/mock_scrobble_buffer_repo.go b/tests/mock_scrobble_buffer_repo.go
new file mode 100644
index 000000000..06b28af75
--- /dev/null
+++ b/tests/mock_scrobble_buffer_repo.go
@@ -0,0 +1,81 @@
+package tests
+
+import (
+	"time"
+
+	"github.com/navidrome/navidrome/model"
+)
+
+type MockedScrobbleBufferRepo struct {
+	Error error
+	data  model.ScrobbleEntries
+}
+
+func CreateMockedScrobbleBufferRepo() *MockedScrobbleBufferRepo {
+	return &MockedScrobbleBufferRepo{}
+}
+
+func (m *MockedScrobbleBufferRepo) UserIDs(service string) ([]string, error) {
+	if m.Error != nil {
+		return nil, m.Error
+	}
+	userIds := make(map[string]struct{})
+	for _, e := range m.data {
+		if e.Service == service {
+			userIds[e.UserID] = struct{}{}
+		}
+	}
+	var result []string
+	for uid := range userIds {
+		result = append(result, uid)
+	}
+	return result, nil
+}
+
+func (m *MockedScrobbleBufferRepo) Enqueue(service, userId, mediaFileId string, playTime time.Time) error {
+	if m.Error != nil {
+		return m.Error
+	}
+	m.data = append(m.data, model.ScrobbleEntry{
+		MediaFile:   model.MediaFile{ID: mediaFileId},
+		Service:     service,
+		UserID:      userId,
+		PlayTime:    playTime,
+		EnqueueTime: time.Now(),
+	})
+	return nil
+}
+
+func (m *MockedScrobbleBufferRepo) Next(service, userId string) (*model.ScrobbleEntry, error) {
+	if m.Error != nil {
+		return nil, m.Error
+	}
+	for _, e := range m.data {
+		if e.Service == service && e.UserID == userId {
+			return &e, nil
+		}
+	}
+	return nil, nil
+}
+
+func (m *MockedScrobbleBufferRepo) Dequeue(entry *model.ScrobbleEntry) error {
+	if m.Error != nil {
+		return m.Error
+	}
+	newData := model.ScrobbleEntries{}
+	for _, e := range m.data {
+		if e.Service == entry.Service && e.UserID == entry.UserID && e.PlayTime == entry.PlayTime && e.MediaFile.ID == entry.MediaFile.ID {
+			continue
+		}
+		newData = append(newData, e)
+	}
+	m.data = newData
+	return nil
+}
+
+func (m *MockedScrobbleBufferRepo) Length() (int64, error) {
+	if m.Error != nil {
+		return 0, m.Error
+	}
+	return int64(len(m.data)), nil
+}