Deluan Quintão 66eaac2762
fix(plugins): add metrics on callbacks and improve plugin method calling (#4304)
* refactor: implement OnSchedulerCallback method in wasmSchedulerCallback

Added the OnSchedulerCallback method to the wasmSchedulerCallback struct, enabling it to handle scheduler callback events. This method constructs a SchedulerCallbackRequest and invokes the corresponding plugin method, facilitating better integration with the scheduling system. The changes improve the plugin's ability to respond to scheduled events, enhancing overall functionality.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): update executeCallback method to use callMethod

Modified the executeCallback method to accept an additional parameter,
methodName, which specifies the callback method to be executed. This change
ensures that the correct method is called for each WebSocket event,
improving the accuracy of callback execution for plugins.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): capture OnInit metrics

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): improve logging for metrics in callMethod

Updated the logging statement in the callMethod function to include the
elapsed time as a separate key in the log output. This change enhances
the clarity of the logged metrics, making it easier to analyze the
performance of plugin requests and troubleshoot any issues that may arise.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): enhance logging for schedule callback execution

Signed-off-by: Deluan <deluan@navidrome.org>

* refactor(server): streamline scrobbler stopping logic

Refactored the logic for stopping scrobbler instances when they are removed.
The new implementation introduces a `stoppableScrobbler` interface to
simplify the type assertion process, allowing for a more concise and
readable code structure. This change ensures that any scrobbler
implementing the `Stop` method is properly stopped before removal,
improving the overall reliability of the plugin management system.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): improve plugin lifecycle management and error handling

Enhanced the plugin lifecycle management by implementing error handling in the OnInit method. The changes include the addition of specific error conditions that can be returned during plugin initialization, allowing for better management of plugin states. Additionally, the unregisterPlugin method was updated to ensure proper cleanup of plugins that fail to initialize, improving overall stability and reliability of the plugin system.

Signed-off-by: Deluan <deluan@navidrome.org>

* refactor(plugins): remove unused LoadAllPlugins and related methods

Eliminated the LoadAllPlugins, LoadAllMediaAgents, and LoadAllScrobblers
methods from the manager implementation as they were not utilized in the codebase.
This cleanup reduces complexity and improves maintainability by removing
redundant code, allowing for a more streamlined plugin management process.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): update logging configuration for plugins

Configured logging for multiple plugins to remove timestamps and source file/line information, while adding specific prefixes for better identification.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): clear initialization state when unregistering a plugin

Added functionality to clear the initialization state of a plugin in the
lifecycle manager when it is unregistered. This change ensures that the
lifecycle state is accurately maintained, preventing potential issues with
plugins that may be re-registered after being unregistered. The new method
`clearInitialized` was implemented to handle this state management.

Signed-off-by: Deluan <deluan@navidrome.org>

* test: add unit tests for convertError function, rename to checkErr

Added comprehensive unit tests for the convertError function to ensure
correct behavior across various scenarios, including handling nil responses,
typed nils, and responses implementing errorResponse. These tests validate
that the function returns the expected results without panicking and
correctly wraps original errors when necessary.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): update plugin base implementation and method calls

Refactored the plugin base implementation by renaming `wasmBasePlugin` to `baseCapability` across multiple files. Updated method calls in the `wasmMediaAgent`, `wasmSchedulerCallback`, and `wasmScrobblerPlugin` to align with the new base structure. These changes improve code clarity and maintainability by standardizing the plugin architecture, ensuring consistent usage of the base capabilities across different plugin types.

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(discord): handle failed connections and improve heartbeat checks

Added a new method to clean up failed connections, which cancels the heartbeat schedule, closes the WebSocket connection, and removes cache entries. Enhanced the heartbeat check to log failures and trigger the cleanup process on the first failure. These changes ensure better management of user connections and improve the overall reliability of the RPC system.

Signed-off-by: Deluan <deluan@navidrome.org>

---------

Signed-off-by: Deluan <deluan@navidrome.org>
2025-07-05 09:03:49 -03:00

403 lines
13 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/navidrome/navidrome/plugins/api"
"github.com/navidrome/navidrome/plugins/host/cache"
"github.com/navidrome/navidrome/plugins/host/http"
"github.com/navidrome/navidrome/plugins/host/scheduler"
"github.com/navidrome/navidrome/plugins/host/websocket"
)
type discordRPC struct {
ws websocket.WebSocketService
web http.HttpService
mem cache.CacheService
sched scheduler.SchedulerService
}
// Discord WebSocket Gateway constants
const (
heartbeatOpCode = 1 // Heartbeat operation code
gateOpCode = 2 // Identify operation code
presenceOpCode = 3 // Presence update operation code
)
const (
heartbeatInterval = 41 // Heartbeat interval in seconds
defaultImage = "https://i.imgur.com/hb3XPzA.png"
)
// Activity is a struct that represents an activity in Discord.
type activity struct {
Name string `json:"name"`
Type int `json:"type"`
Details string `json:"details"`
State string `json:"state"`
Application string `json:"application_id"`
Timestamps activityTimestamps `json:"timestamps"`
Assets activityAssets `json:"assets"`
}
type activityTimestamps struct {
Start int64 `json:"start"`
End int64 `json:"end"`
}
type activityAssets struct {
LargeImage string `json:"large_image"`
LargeText string `json:"large_text"`
}
// PresencePayload is a struct that represents a presence update in Discord.
type presencePayload struct {
Activities []activity `json:"activities"`
Since int64 `json:"since"`
Status string `json:"status"`
Afk bool `json:"afk"`
}
// IdentifyPayload is a struct that represents an identify payload in Discord.
type identifyPayload struct {
Token string `json:"token"`
Intents int `json:"intents"`
Properties identifyProperties `json:"properties"`
}
type identifyProperties struct {
OS string `json:"os"`
Browser string `json:"browser"`
Device string `json:"device"`
}
func (r *discordRPC) processImage(ctx context.Context, imageURL string, clientID string, token string) (string, error) {
return r.processImageWithFallback(ctx, imageURL, clientID, token, false)
}
func (r *discordRPC) processImageWithFallback(ctx context.Context, imageURL string, clientID string, token string, isDefaultImage bool) (string, error) {
// Check if context is canceled
if err := ctx.Err(); err != nil {
return "", fmt.Errorf("context canceled: %w", err)
}
if imageURL == "" {
if isDefaultImage {
// We're already processing the default image and it's empty, return error
return "", fmt.Errorf("default image URL is empty")
}
return r.processImageWithFallback(ctx, defaultImage, clientID, token, true)
}
if strings.HasPrefix(imageURL, "mp:") {
return imageURL, nil
}
// Check cache first
cacheKey := fmt.Sprintf("discord.image.%x", imageURL)
cacheResp, _ := r.mem.GetString(ctx, &cache.GetRequest{Key: cacheKey})
if cacheResp.Exists {
log.Printf("Cache hit for image URL: %s", imageURL)
return cacheResp.Value, nil
}
resp, _ := r.web.Post(ctx, &http.HttpRequest{
Url: fmt.Sprintf("https://discord.com/api/v9/applications/%s/external-assets", clientID),
Headers: map[string]string{
"Authorization": token,
"Content-Type": "application/json",
},
Body: fmt.Appendf(nil, `{"urls":[%q]}`, imageURL),
})
// Handle HTTP error responses
if resp.Status >= 400 {
if isDefaultImage {
return "", fmt.Errorf("failed to process default image: HTTP %d %s", resp.Status, resp.Error)
}
return r.processImageWithFallback(ctx, defaultImage, clientID, token, true)
}
if resp.Error != "" {
if isDefaultImage {
// If we're already processing the default image and it fails, return error
return "", fmt.Errorf("failed to process default image: %s", resp.Error)
}
// Try with default image
return r.processImageWithFallback(ctx, defaultImage, clientID, token, true)
}
var data []map[string]string
if err := json.Unmarshal(resp.Body, &data); err != nil {
if isDefaultImage {
// If we're already processing the default image and it fails, return error
return "", fmt.Errorf("failed to unmarshal default image response: %w", err)
}
// Try with default image
return r.processImageWithFallback(ctx, defaultImage, clientID, token, true)
}
if len(data) == 0 {
if isDefaultImage {
// If we're already processing the default image and it fails, return error
return "", fmt.Errorf("no data returned for default image")
}
// Try with default image
return r.processImageWithFallback(ctx, defaultImage, clientID, token, true)
}
image := data[0]["external_asset_path"]
if image == "" {
if isDefaultImage {
// If we're already processing the default image and it fails, return error
return "", fmt.Errorf("empty external_asset_path for default image")
}
// Try with default image
return r.processImageWithFallback(ctx, defaultImage, clientID, token, true)
}
processedImage := fmt.Sprintf("mp:%s", image)
// Cache the processed image URL
var ttl = 4 * time.Hour // 4 hours for regular images
if isDefaultImage {
ttl = 48 * time.Hour // 48 hours for default image
}
_, _ = r.mem.SetString(ctx, &cache.SetStringRequest{
Key: cacheKey,
Value: processedImage,
TtlSeconds: int64(ttl.Seconds()),
})
log.Printf("Cached processed image URL for %s (TTL: %s seconds)", imageURL, ttl)
return processedImage, nil
}
func (r *discordRPC) sendActivity(ctx context.Context, clientID, username, token string, data activity) error {
log.Printf("Sending activity to for user %s: %#v", username, data)
processedImage, err := r.processImage(ctx, data.Assets.LargeImage, clientID, token)
if err != nil {
log.Printf("Failed to process image for user %s, continuing without image: %v", username, err)
// Clear the image and continue without it
data.Assets.LargeImage = ""
} else {
log.Printf("Processed image for URL %s: %s", data.Assets.LargeImage, processedImage)
data.Assets.LargeImage = processedImage
}
presence := presencePayload{
Activities: []activity{data},
Status: "dnd",
Afk: false,
}
return r.sendMessage(ctx, username, presenceOpCode, presence)
}
func (r *discordRPC) clearActivity(ctx context.Context, username string) error {
log.Printf("Clearing activity for user %s", username)
return r.sendMessage(ctx, username, presenceOpCode, presencePayload{})
}
func (r *discordRPC) sendMessage(ctx context.Context, username string, opCode int, payload any) error {
message := map[string]any{
"op": opCode,
"d": payload,
}
b, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("failed to marshal presence update: %w", err)
}
resp, _ := r.ws.SendText(ctx, &websocket.SendTextRequest{
ConnectionId: username,
Message: string(b),
})
if resp.Error != "" {
return fmt.Errorf("failed to send presence update: %s", resp.Error)
}
return nil
}
func (r *discordRPC) getDiscordGateway(ctx context.Context) (string, error) {
resp, _ := r.web.Get(ctx, &http.HttpRequest{
Url: "https://discord.com/api/gateway",
})
if resp.Error != "" {
return "", fmt.Errorf("failed to get Discord gateway: %s", resp.Error)
}
var result map[string]string
err := json.Unmarshal(resp.Body, &result)
if err != nil {
return "", fmt.Errorf("failed to parse Discord gateway response: %w", err)
}
return result["url"], nil
}
func (r *discordRPC) sendHeartbeat(ctx context.Context, username string) error {
resp, _ := r.mem.GetInt(ctx, &cache.GetRequest{
Key: fmt.Sprintf("discord.seq.%s", username),
})
log.Printf("Sending heartbeat for user %s: %d", username, resp.Value)
return r.sendMessage(ctx, username, heartbeatOpCode, resp.Value)
}
func (r *discordRPC) cleanupFailedConnection(ctx context.Context, username string) {
log.Printf("Cleaning up failed connection for user %s", username)
// Cancel the heartbeat schedule
if resp, _ := r.sched.CancelSchedule(ctx, &scheduler.CancelRequest{ScheduleId: username}); resp.Error != "" {
log.Printf("Failed to cancel heartbeat schedule for user %s: %s", username, resp.Error)
}
// Close the WebSocket connection
if resp, _ := r.ws.Close(ctx, &websocket.CloseRequest{
ConnectionId: username,
Code: 1000,
Reason: "Connection lost",
}); resp.Error != "" {
log.Printf("Failed to close WebSocket connection for user %s: %s", username, resp.Error)
}
// Clean up cache entries (just the sequence number, no failure tracking needed)
_, _ = r.mem.Remove(ctx, &cache.RemoveRequest{Key: fmt.Sprintf("discord.seq.%s", username)})
log.Printf("Cleaned up connection for user %s", username)
}
func (r *discordRPC) isConnected(ctx context.Context, username string) bool {
// Try to send a heartbeat to test the connection
err := r.sendHeartbeat(ctx, username)
if err != nil {
log.Printf("Heartbeat test failed for user %s: %v", username, err)
return false
}
return true
}
func (r *discordRPC) connect(ctx context.Context, username string, token string) error {
if r.isConnected(ctx, username) {
log.Printf("Reusing existing connection for user %s", username)
return nil
}
log.Printf("Creating new connection for user %s", username)
// Get Discord Gateway URL
gateway, err := r.getDiscordGateway(ctx)
if err != nil {
return fmt.Errorf("failed to get Discord gateway: %w", err)
}
log.Printf("Using gateway: %s", gateway)
// Connect to Discord Gateway
resp, _ := r.ws.Connect(ctx, &websocket.ConnectRequest{
ConnectionId: username,
Url: gateway,
})
if resp.Error != "" {
return fmt.Errorf("failed to connect to WebSocket: %s", resp.Error)
}
// Send identify payload
payload := identifyPayload{
Token: token,
Intents: 0,
Properties: identifyProperties{
OS: "Windows 10",
Browser: "Discord Client",
Device: "Discord Client",
},
}
err = r.sendMessage(ctx, username, gateOpCode, payload)
if err != nil {
return fmt.Errorf("failed to send identify payload: %w", err)
}
// Schedule heartbeats for this user/connection
cronResp, _ := r.sched.ScheduleRecurring(ctx, &scheduler.ScheduleRecurringRequest{
CronExpression: fmt.Sprintf("@every %ds", heartbeatInterval),
ScheduleId: username,
})
log.Printf("Scheduled heartbeat for user %s with ID %s", username, cronResp.ScheduleId)
log.Printf("Successfully authenticated user %s", username)
return nil
}
func (r *discordRPC) disconnect(ctx context.Context, username string) error {
if resp, _ := r.sched.CancelSchedule(ctx, &scheduler.CancelRequest{ScheduleId: username}); resp.Error != "" {
return fmt.Errorf("failed to cancel schedule: %s", resp.Error)
}
resp, _ := r.ws.Close(ctx, &websocket.CloseRequest{
ConnectionId: username,
Code: 1000,
Reason: "Navidrome disconnect",
})
if resp.Error != "" {
return fmt.Errorf("failed to close WebSocket connection: %s", resp.Error)
}
return nil
}
func (r *discordRPC) OnTextMessage(ctx context.Context, req *api.OnTextMessageRequest) (*api.OnTextMessageResponse, error) {
if len(req.Message) < 1024 {
log.Printf("Received WebSocket message for connection '%s': %s", req.ConnectionId, req.Message)
} else {
log.Printf("Received WebSocket message for connection '%s' (truncated): %s...", req.ConnectionId, req.Message[:1021])
}
// Parse the message. If it's a heartbeat_ack, store the sequence number.
message := map[string]any{}
err := json.Unmarshal([]byte(req.Message), &message)
if err != nil {
return nil, fmt.Errorf("failed to parse WebSocket message: %w", err)
}
if v := message["s"]; v != nil {
seq := int64(v.(float64))
log.Printf("Received heartbeat_ack for connection '%s': %d", req.ConnectionId, seq)
resp, _ := r.mem.SetInt(ctx, &cache.SetIntRequest{
Key: fmt.Sprintf("discord.seq.%s", req.ConnectionId),
Value: seq,
TtlSeconds: heartbeatInterval * 2,
})
if !resp.Success {
return nil, fmt.Errorf("failed to store sequence number for user %s", req.ConnectionId)
}
}
return nil, nil
}
func (r *discordRPC) OnBinaryMessage(_ context.Context, req *api.OnBinaryMessageRequest) (*api.OnBinaryMessageResponse, error) {
log.Printf("Received unexpected binary message for connection '%s'", req.ConnectionId)
return nil, nil
}
func (r *discordRPC) OnError(_ context.Context, req *api.OnErrorRequest) (*api.OnErrorResponse, error) {
log.Printf("WebSocket error for connection '%s': %s", req.ConnectionId, req.Error)
return nil, nil
}
func (r *discordRPC) OnClose(_ context.Context, req *api.OnCloseRequest) (*api.OnCloseResponse, error) {
log.Printf("WebSocket connection '%s' closed with code %d: %s", req.ConnectionId, req.Code, req.Reason)
return nil, nil
}
func (r *discordRPC) OnSchedulerCallback(ctx context.Context, req *api.SchedulerCallbackRequest) (*api.SchedulerCallbackResponse, error) {
err := r.sendHeartbeat(ctx, req.ScheduleId)
if err != nil {
// On first heartbeat failure, immediately clean up the connection
// The next NowPlaying call will reconnect if needed
log.Printf("Heartbeat failed for user %s, cleaning up connection: %v", req.ScheduleId, err)
r.cleanupFailedConnection(ctx, req.ScheduleId)
return nil, fmt.Errorf("heartbeat failed, connection cleaned up: %w", err)
}
return nil, nil
}