mirror of
https://github.com/navidrome/navidrome.git
synced 2025-07-13 23:21:21 +03:00
* refactor: implement OnSchedulerCallback method in wasmSchedulerCallback Added the OnSchedulerCallback method to the wasmSchedulerCallback struct, enabling it to handle scheduler callback events. This method constructs a SchedulerCallbackRequest and invokes the corresponding plugin method, facilitating better integration with the scheduling system. The changes improve the plugin's ability to respond to scheduled events, enhancing overall functionality. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): update executeCallback method to use callMethod Modified the executeCallback method to accept an additional parameter, methodName, which specifies the callback method to be executed. This change ensures that the correct method is called for each WebSocket event, improving the accuracy of callback execution for plugins. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): capture OnInit metrics Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): improve logging for metrics in callMethod Updated the logging statement in the callMethod function to include the elapsed time as a separate key in the log output. This change enhances the clarity of the logged metrics, making it easier to analyze the performance of plugin requests and troubleshoot any issues that may arise. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): enhance logging for schedule callback execution Signed-off-by: Deluan <deluan@navidrome.org> * refactor(server): streamline scrobbler stopping logic Refactored the logic for stopping scrobbler instances when they are removed. The new implementation introduces a `stoppableScrobbler` interface to simplify the type assertion process, allowing for a more concise and readable code structure. This change ensures that any scrobbler implementing the `Stop` method is properly stopped before removal, improving the overall reliability of the plugin management system. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): improve plugin lifecycle management and error handling Enhanced the plugin lifecycle management by implementing error handling in the OnInit method. The changes include the addition of specific error conditions that can be returned during plugin initialization, allowing for better management of plugin states. Additionally, the unregisterPlugin method was updated to ensure proper cleanup of plugins that fail to initialize, improving overall stability and reliability of the plugin system. Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): remove unused LoadAllPlugins and related methods Eliminated the LoadAllPlugins, LoadAllMediaAgents, and LoadAllScrobblers methods from the manager implementation as they were not utilized in the codebase. This cleanup reduces complexity and improves maintainability by removing redundant code, allowing for a more streamlined plugin management process. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): update logging configuration for plugins Configured logging for multiple plugins to remove timestamps and source file/line information, while adding specific prefixes for better identification. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): clear initialization state when unregistering a plugin Added functionality to clear the initialization state of a plugin in the lifecycle manager when it is unregistered. This change ensures that the lifecycle state is accurately maintained, preventing potential issues with plugins that may be re-registered after being unregistered. The new method `clearInitialized` was implemented to handle this state management. Signed-off-by: Deluan <deluan@navidrome.org> * test: add unit tests for convertError function, rename to checkErr Added comprehensive unit tests for the convertError function to ensure correct behavior across various scenarios, including handling nil responses, typed nils, and responses implementing errorResponse. These tests validate that the function returns the expected results without panicking and correctly wraps original errors when necessary. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): update plugin base implementation and method calls Refactored the plugin base implementation by renaming `wasmBasePlugin` to `baseCapability` across multiple files. Updated method calls in the `wasmMediaAgent`, `wasmSchedulerCallback`, and `wasmScrobblerPlugin` to align with the new base structure. These changes improve code clarity and maintainability by standardizing the plugin architecture, ensuring consistent usage of the base capabilities across different plugin types. Signed-off-by: Deluan <deluan@navidrome.org> * fix(discord): handle failed connections and improve heartbeat checks Added a new method to clean up failed connections, which cancels the heartbeat schedule, closes the WebSocket connection, and removes cache entries. Enhanced the heartbeat check to log failures and trigger the cleanup process on the first failure. These changes ensure better management of user connections and improve the overall reliability of the RPC system. Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
324 lines
10 KiB
Go
324 lines
10 KiB
Go
package plugins
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
gonanoid "github.com/matoous/go-nanoid/v2"
|
|
"github.com/navidrome/navidrome/log"
|
|
"github.com/navidrome/navidrome/plugins/host/scheduler"
|
|
navidsched "github.com/navidrome/navidrome/scheduler"
|
|
)
|
|
|
|
const (
|
|
ScheduleTypeOneTime = "one-time"
|
|
ScheduleTypeRecurring = "recurring"
|
|
)
|
|
|
|
// ScheduledCallback represents a registered schedule callback
|
|
type ScheduledCallback struct {
|
|
ID string
|
|
PluginID string
|
|
Type string // "one-time" or "recurring"
|
|
Payload []byte
|
|
EntryID int // Used for recurring schedules via the scheduler
|
|
Cancel context.CancelFunc // Used for one-time schedules
|
|
}
|
|
|
|
// SchedulerHostFunctions implements the scheduler.SchedulerService interface
|
|
type SchedulerHostFunctions struct {
|
|
ss *schedulerService
|
|
pluginID string
|
|
}
|
|
|
|
func (s SchedulerHostFunctions) ScheduleOneTime(ctx context.Context, req *scheduler.ScheduleOneTimeRequest) (*scheduler.ScheduleResponse, error) {
|
|
return s.ss.scheduleOneTime(ctx, s.pluginID, req)
|
|
}
|
|
|
|
func (s SchedulerHostFunctions) ScheduleRecurring(ctx context.Context, req *scheduler.ScheduleRecurringRequest) (*scheduler.ScheduleResponse, error) {
|
|
return s.ss.scheduleRecurring(ctx, s.pluginID, req)
|
|
}
|
|
|
|
func (s SchedulerHostFunctions) CancelSchedule(ctx context.Context, req *scheduler.CancelRequest) (*scheduler.CancelResponse, error) {
|
|
return s.ss.cancelSchedule(ctx, s.pluginID, req)
|
|
}
|
|
|
|
type schedulerService struct {
|
|
// Map of schedule IDs to their callback info
|
|
schedules map[string]*ScheduledCallback
|
|
manager *managerImpl
|
|
navidSched navidsched.Scheduler // Navidrome scheduler for recurring jobs
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// newSchedulerService creates a new schedulerService instance
|
|
func newSchedulerService(manager *managerImpl) *schedulerService {
|
|
return &schedulerService{
|
|
schedules: make(map[string]*ScheduledCallback),
|
|
manager: manager,
|
|
navidSched: navidsched.GetInstance(),
|
|
}
|
|
}
|
|
|
|
func (s *schedulerService) HostFunctions(pluginID string) SchedulerHostFunctions {
|
|
return SchedulerHostFunctions{
|
|
ss: s,
|
|
pluginID: pluginID,
|
|
}
|
|
}
|
|
|
|
// Safe accessor methods for tests
|
|
|
|
// hasSchedule safely checks if a schedule exists
|
|
func (s *schedulerService) hasSchedule(id string) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
_, exists := s.schedules[id]
|
|
return exists
|
|
}
|
|
|
|
// scheduleCount safely returns the number of schedules
|
|
func (s *schedulerService) scheduleCount() int {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return len(s.schedules)
|
|
}
|
|
|
|
// getScheduleType safely returns the type of a schedule
|
|
func (s *schedulerService) getScheduleType(id string) string {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if cb, exists := s.schedules[id]; exists {
|
|
return cb.Type
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// scheduleJob is a helper function that handles the common logic for scheduling jobs
|
|
func (s *schedulerService) scheduleJob(pluginID string, scheduleId string, jobType string, payload []byte) (string, *ScheduledCallback, context.CancelFunc, error) {
|
|
if s.manager == nil {
|
|
return "", nil, nil, fmt.Errorf("scheduler service not properly initialized")
|
|
}
|
|
|
|
// Original scheduleId (what the plugin will see)
|
|
originalScheduleId := scheduleId
|
|
if originalScheduleId == "" {
|
|
// Generate a random ID if one wasn't provided
|
|
originalScheduleId, _ = gonanoid.New(10)
|
|
}
|
|
|
|
// Internal scheduleId (prefixed with plugin name to avoid conflicts)
|
|
internalScheduleId := pluginID + ":" + originalScheduleId
|
|
|
|
// Store any existing cancellation function to call after we've updated the map
|
|
var cancelExisting context.CancelFunc
|
|
|
|
// Check if there's an existing schedule with the same ID, we'll cancel it after updating the map
|
|
if existingSchedule, ok := s.schedules[internalScheduleId]; ok {
|
|
log.Debug("Replacing existing schedule with same ID", "plugin", pluginID, "scheduleID", originalScheduleId)
|
|
|
|
// Store cancel information but don't call it yet
|
|
if existingSchedule.Type == ScheduleTypeOneTime && existingSchedule.Cancel != nil {
|
|
// We'll set the Cancel to nil to prevent the old job from removing the new one
|
|
cancelExisting = existingSchedule.Cancel
|
|
existingSchedule.Cancel = nil
|
|
} else if existingSchedule.Type == ScheduleTypeRecurring {
|
|
existingRecurringEntryID := existingSchedule.EntryID
|
|
if existingRecurringEntryID != 0 {
|
|
s.navidSched.Remove(existingRecurringEntryID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create the callback object
|
|
callback := &ScheduledCallback{
|
|
ID: originalScheduleId,
|
|
PluginID: pluginID,
|
|
Type: jobType,
|
|
Payload: payload,
|
|
}
|
|
|
|
return internalScheduleId, callback, cancelExisting, nil
|
|
}
|
|
|
|
// scheduleOneTime registers a new one-time scheduled job
|
|
func (s *schedulerService) scheduleOneTime(_ context.Context, pluginID string, req *scheduler.ScheduleOneTimeRequest) (*scheduler.ScheduleResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
internalScheduleId, callback, cancelExisting, err := s.scheduleJob(pluginID, req.ScheduleId, ScheduleTypeOneTime, req.Payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a context with cancel for this one-time schedule
|
|
scheduleCtx, cancel := context.WithCancel(context.Background())
|
|
callback.Cancel = cancel
|
|
|
|
// Store the callback info
|
|
s.schedules[internalScheduleId] = callback
|
|
|
|
// Now that the new job is in the map, we can safely cancel the old one
|
|
if cancelExisting != nil {
|
|
// Cancel in a goroutine to avoid deadlock since we're already holding the lock
|
|
go cancelExisting()
|
|
}
|
|
|
|
log.Debug("One-time schedule registered", "plugin", pluginID, "scheduleID", callback.ID, "internalID", internalScheduleId)
|
|
|
|
// Start the timer goroutine with the internal ID
|
|
go s.runOneTimeSchedule(scheduleCtx, internalScheduleId, time.Duration(req.DelaySeconds)*time.Second)
|
|
|
|
// Return the original ID to the plugin
|
|
return &scheduler.ScheduleResponse{
|
|
ScheduleId: callback.ID,
|
|
}, nil
|
|
}
|
|
|
|
// scheduleRecurring registers a new recurring scheduled job
|
|
func (s *schedulerService) scheduleRecurring(_ context.Context, pluginID string, req *scheduler.ScheduleRecurringRequest) (*scheduler.ScheduleResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
internalScheduleId, callback, cancelExisting, err := s.scheduleJob(pluginID, req.ScheduleId, ScheduleTypeRecurring, req.Payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Schedule the job with the Navidrome scheduler
|
|
entryID, err := s.navidSched.Add(req.CronExpression, func() {
|
|
s.executeCallback(context.Background(), internalScheduleId, true)
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to schedule recurring job: %w", err)
|
|
}
|
|
|
|
// Store the entry ID so we can cancel it later
|
|
callback.EntryID = entryID
|
|
|
|
// Store the callback info
|
|
s.schedules[internalScheduleId] = callback
|
|
|
|
// Now that the new job is in the map, we can safely cancel the old one
|
|
if cancelExisting != nil {
|
|
// Cancel in a goroutine to avoid deadlock since we're already holding the lock
|
|
go cancelExisting()
|
|
}
|
|
|
|
log.Debug("Recurring schedule registered", "plugin", pluginID, "scheduleID", callback.ID, "internalID", internalScheduleId, "cron", req.CronExpression)
|
|
|
|
// Return the original ID to the plugin
|
|
return &scheduler.ScheduleResponse{
|
|
ScheduleId: callback.ID,
|
|
}, nil
|
|
}
|
|
|
|
// cancelSchedule cancels a scheduled job (either one-time or recurring)
|
|
func (s *schedulerService) cancelSchedule(_ context.Context, pluginID string, req *scheduler.CancelRequest) (*scheduler.CancelResponse, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
internalScheduleId := pluginID + ":" + req.ScheduleId
|
|
callback, exists := s.schedules[internalScheduleId]
|
|
if !exists {
|
|
return &scheduler.CancelResponse{
|
|
Success: false,
|
|
Error: "schedule not found",
|
|
}, nil
|
|
}
|
|
|
|
// Store the cancel functions to call after we've updated the schedule map
|
|
var cancelFunc context.CancelFunc
|
|
var recurringEntryID int
|
|
|
|
// Store cancel information but don't call it yet
|
|
if callback.Type == ScheduleTypeOneTime && callback.Cancel != nil {
|
|
cancelFunc = callback.Cancel
|
|
callback.Cancel = nil // Set to nil to prevent the cancel handler from removing the job
|
|
} else if callback.Type == ScheduleTypeRecurring {
|
|
recurringEntryID = callback.EntryID
|
|
}
|
|
|
|
// First remove from the map
|
|
delete(s.schedules, internalScheduleId)
|
|
|
|
// Now perform the cancellation safely
|
|
if cancelFunc != nil {
|
|
// Execute in a goroutine to avoid deadlock since we're already holding the lock
|
|
go cancelFunc()
|
|
}
|
|
if recurringEntryID != 0 {
|
|
s.navidSched.Remove(recurringEntryID)
|
|
}
|
|
|
|
log.Debug("Schedule canceled", "plugin", pluginID, "scheduleID", req.ScheduleId, "internalID", internalScheduleId, "type", callback.Type)
|
|
|
|
return &scheduler.CancelResponse{
|
|
Success: true,
|
|
}, nil
|
|
}
|
|
|
|
// runOneTimeSchedule handles the one-time schedule execution and callback
|
|
func (s *schedulerService) runOneTimeSchedule(ctx context.Context, internalScheduleId string, delay time.Duration) {
|
|
tmr := time.NewTimer(delay)
|
|
defer tmr.Stop()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
// Schedule was cancelled via its context
|
|
// We're no longer removing the schedule here because that's handled by the code that
|
|
// cancelled the context
|
|
log.Debug("One-time schedule context canceled", "internalID", internalScheduleId)
|
|
return
|
|
|
|
case <-tmr.C:
|
|
// Timer fired, execute the callback
|
|
s.executeCallback(ctx, internalScheduleId, false)
|
|
}
|
|
}
|
|
|
|
// executeCallback calls the plugin's OnSchedulerCallback method
|
|
func (s *schedulerService) executeCallback(ctx context.Context, internalScheduleId string, isRecurring bool) {
|
|
s.mu.Lock()
|
|
callback := s.schedules[internalScheduleId]
|
|
// Only remove one-time schedules from the map after execution
|
|
if callback != nil && callback.Type == ScheduleTypeOneTime {
|
|
delete(s.schedules, internalScheduleId)
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if callback == nil {
|
|
log.Error("Schedule not found for callback", "internalID", internalScheduleId)
|
|
return
|
|
}
|
|
|
|
ctx = log.NewContext(ctx, "plugin", callback.PluginID, "scheduleID", callback.ID, "type", callback.Type)
|
|
log.Debug("Executing schedule callback")
|
|
start := time.Now()
|
|
|
|
// Get the plugin
|
|
p := s.manager.LoadPlugin(callback.PluginID, CapabilitySchedulerCallback)
|
|
if p == nil {
|
|
log.Error("Plugin not found for callback", "plugin", callback.PluginID)
|
|
return
|
|
}
|
|
|
|
// Type-check the plugin
|
|
plugin, ok := p.(*wasmSchedulerCallback)
|
|
if !ok {
|
|
log.Error("Plugin does not implement SchedulerCallback", "plugin", callback.PluginID)
|
|
return
|
|
}
|
|
|
|
// Call the plugin's OnSchedulerCallback method
|
|
log.Trace(ctx, "Executing schedule callback")
|
|
err := plugin.OnSchedulerCallback(ctx, callback.ID, callback.Payload, isRecurring)
|
|
if err != nil {
|
|
log.Error("Error executing schedule callback", "elapsed", time.Since(start), err)
|
|
return
|
|
}
|
|
log.Debug("Schedule callback executed", "elapsed", time.Since(start))
|
|
}
|