mirror of
https://github.com/navidrome/navidrome.git
synced 2025-05-08 14:21:09 +03:00
feat: implement MCP agent process auto-restart
Modify the MCPAgent to automatically attempt restarting the external\nserver process if it detects the process has exited.\n\n- Replaced sync.Once with mutex-protected checks (a.client == nil) to allow\n re-initialization.\n- The monitoring goroutine now cleans up agent state (nils client, cmd, stdin)\n upon process exit, signaling the need for restart.\n- ensureClientInitialized now attempts to start/initialize if the client is nil.\n- GetArtistBiography checks client validity again after locking to handle race\n conditions where the process might die just after initialization check.
This commit is contained in:
parent
8326a20eda
commit
8b754a7c73
@ -24,14 +24,16 @@ const (
|
|||||||
// Hardcoded path for PoC
|
// Hardcoded path for PoC
|
||||||
mcpServerPath = "/Users/deluan/Development/navidrome/plugins-mcp/mcp-server"
|
mcpServerPath = "/Users/deluan/Development/navidrome/plugins-mcp/mcp-server"
|
||||||
mcpToolName = "get_artist_biography"
|
mcpToolName = "get_artist_biography"
|
||||||
|
// TODO: Add configuration for restart delays
|
||||||
|
initializationTimeout = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// MCPAgent interacts with an external MCP server for metadata retrieval.
|
// MCPAgent interacts with an external MCP server for metadata retrieval.
|
||||||
// It keeps a single instance of the server process running.
|
// It keeps a single instance of the server process running and attempts restart on failure.
|
||||||
type MCPAgent struct {
|
type MCPAgent struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
initOnce sync.Once
|
// No longer using sync.Once to allow re-initialization
|
||||||
initErr error // Stores initialization error
|
// initErr error // Error state managed implicitly by client being nil
|
||||||
|
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
stdin io.WriteCloser
|
stdin io.WriteCloser
|
||||||
@ -45,7 +47,6 @@ func mcpConstructor(ds model.DataStore) agents.Interface {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Info("MCP Agent created, server will be started on first request", "serverPath", mcpServerPath)
|
log.Info("MCP Agent created, server will be started on first request", "serverPath", mcpServerPath)
|
||||||
// Initialize the struct, but don't start the process yet.
|
|
||||||
return &MCPAgent{}
|
return &MCPAgent{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,84 +54,115 @@ func (a *MCPAgent) AgentName() string {
|
|||||||
return mcpAgentName
|
return mcpAgentName
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureClientInitialized starts the MCP server process and initializes the client exactly once.
|
// ensureClientInitialized starts the MCP server process and initializes the client if needed.
|
||||||
func (a *MCPAgent) ensureClientInitialized(ctx context.Context) error {
|
// It now attempts restart if the client is found to be nil.
|
||||||
a.initOnce.Do(func() {
|
func (a *MCPAgent) ensureClientInitialized(ctx context.Context) (err error) {
|
||||||
log.Info(ctx, "Initializing MCP client and starting server process...", "serverPath", mcpServerPath)
|
a.mu.Lock()
|
||||||
// Use background context for the command itself, so it doesn't get cancelled by the request context.
|
// If client is already initialized and valid, we're done.
|
||||||
// We manage its lifecycle independently.
|
if a.client != nil {
|
||||||
cmd := exec.CommandContext(context.Background(), mcpServerPath)
|
a.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Unlock after the check, as the rest of the function needs the lock.
|
||||||
|
a.mu.Unlock()
|
||||||
|
|
||||||
stdin, err := cmd.StdinPipe()
|
// --- Attempt initialization/restart ---
|
||||||
if err != nil {
|
log.Info(ctx, "Initializing MCP client and starting/restarting server process...", "serverPath", mcpServerPath)
|
||||||
a.initErr = fmt.Errorf("failed to get stdin pipe for MCP server: %w", err)
|
|
||||||
log.Error(ctx, "MCP init failed", "error", a.initErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
stdout, err := cmd.StdoutPipe()
|
// Use background context for the command itself, so it doesn't get cancelled by the request context.
|
||||||
if err != nil {
|
cmd := exec.CommandContext(context.Background(), mcpServerPath)
|
||||||
a.initErr = fmt.Errorf("failed to get stdout pipe for MCP server: %w", err)
|
|
||||||
_ = stdin.Close() // Clean up stdin pipe
|
|
||||||
log.Error(ctx, "MCP init failed", "error", a.initErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Capture stderr for debugging
|
var stdin io.WriteCloser
|
||||||
var stderr strings.Builder
|
var stdout io.ReadCloser
|
||||||
cmd.Stderr = &stderr
|
var stderr strings.Builder
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
stdin, err = cmd.StdinPipe()
|
||||||
a.initErr = fmt.Errorf("failed to start MCP server process: %w", err)
|
if err != nil {
|
||||||
_ = stdin.Close()
|
err = fmt.Errorf("failed to get stdin pipe for MCP server: %w", err)
|
||||||
_ = stdout.Close()
|
log.Error(ctx, "MCP init/restart failed", "error", err)
|
||||||
log.Error(ctx, "MCP init failed", "error", a.initErr)
|
return err // Return error directly
|
||||||
return
|
}
|
||||||
}
|
|
||||||
|
|
||||||
a.cmd = cmd
|
stdout, err = cmd.StdoutPipe()
|
||||||
a.stdin = stdin
|
if err != nil {
|
||||||
log.Info(ctx, "MCP server process started", "pid", cmd.Process.Pid)
|
err = fmt.Errorf("failed to get stdout pipe for MCP server: %w", err)
|
||||||
|
_ = stdin.Close() // Clean up stdin pipe
|
||||||
|
log.Error(ctx, "MCP init/restart failed", "error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Start a goroutine to wait for the process and log if it exits unexpectedly
|
cmd.Stderr = &stderr
|
||||||
go func() {
|
|
||||||
err := cmd.Wait()
|
if err = cmd.Start(); err != nil {
|
||||||
log.Warn("MCP server process exited", "pid", cmd.Process.Pid, "error", err, "stderr", stderr.String())
|
err = fmt.Errorf("failed to start MCP server process: %w", err)
|
||||||
// Maybe add logic here to attempt restart or mark the agent as unhealthy?
|
_ = stdin.Close()
|
||||||
// For PoC, just log it.
|
_ = stdout.Close()
|
||||||
// Ensure pipes are closed if the process exits
|
log.Error(ctx, "MCP init/restart failed", "error", err)
|
||||||
a.mu.Lock()
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
currentPid := cmd.Process.Pid
|
||||||
|
log.Info(ctx, "MCP server process started/restarted", "pid", currentPid)
|
||||||
|
|
||||||
|
// --- Start monitoring goroutine for *this* process ---
|
||||||
|
go func(processCmd *exec.Cmd, processStderr *strings.Builder, processPid int) {
|
||||||
|
waitErr := processCmd.Wait()
|
||||||
|
// Lock immediately after Wait returns to update state atomically
|
||||||
|
a.mu.Lock()
|
||||||
|
log.Warn("MCP server process exited", "pid", processPid, "error", waitErr, "stderr", processStderr.String())
|
||||||
|
// Clean up state only if this is still the *current* process
|
||||||
|
// (to avoid race condition if a quick restart happened)
|
||||||
|
if a.cmd == processCmd {
|
||||||
if a.stdin != nil {
|
if a.stdin != nil {
|
||||||
_ = a.stdin.Close()
|
_ = a.stdin.Close()
|
||||||
a.stdin = nil
|
a.stdin = nil
|
||||||
}
|
}
|
||||||
// stdout is typically closed by the StdioServerTransport when done reading.
|
a.client = nil // Mark client as unusable, triggering restart on next call
|
||||||
a.client = nil // Mark client as unusable
|
|
||||||
a.cmd = nil
|
a.cmd = nil
|
||||||
a.mu.Unlock()
|
log.Info("MCP agent state cleaned up after process exit", "pid", processPid)
|
||||||
}()
|
} else {
|
||||||
|
log.Debug("MCP agent process exited, but state already updated by newer process", "exitedPid", processPid)
|
||||||
// Create and initialize the MCP client
|
|
||||||
transport := stdio.NewStdioServerTransportWithIO(stdout, stdin)
|
|
||||||
client := mcp.NewClient(transport)
|
|
||||||
|
|
||||||
// Use a specific context for initialization, not the potentially short-lived request context
|
|
||||||
initCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Example timeout
|
|
||||||
defer cancel()
|
|
||||||
if _, err := client.Initialize(initCtx); err != nil {
|
|
||||||
a.initErr = fmt.Errorf("failed to initialize MCP client: %w", err)
|
|
||||||
log.Error(ctx, "MCP client initialization failed", "error", a.initErr)
|
|
||||||
// Attempt to kill the process we just started
|
|
||||||
if killErr := cmd.Process.Kill(); killErr != nil {
|
|
||||||
log.Error(ctx, "Failed to kill MCP server process after init failure", "pid", cmd.Process.Pid, "error", killErr)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
a.mu.Unlock()
|
||||||
|
}(cmd, &stderr, currentPid) // Pass copies/values to the goroutine
|
||||||
|
|
||||||
a.client = client
|
// --- Initialize MCP client ---
|
||||||
log.Info(ctx, "MCP client initialized successfully")
|
transport := stdio.NewStdioServerTransportWithIO(stdout, stdin) // Use the pipes from this attempt
|
||||||
})
|
client := mcp.NewClient(transport)
|
||||||
return a.initErr
|
|
||||||
|
initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if _, err = client.Initialize(initCtx); err != nil {
|
||||||
|
err = fmt.Errorf("failed to initialize MCP client: %w", err)
|
||||||
|
log.Error(ctx, "MCP client initialization failed after process start", "pid", currentPid, "error", err)
|
||||||
|
// Attempt to kill the process we just started, as client init failed
|
||||||
|
if killErr := cmd.Process.Kill(); killErr != nil {
|
||||||
|
log.Error(ctx, "Failed to kill MCP server process after init failure", "pid", currentPid, "error", killErr)
|
||||||
|
}
|
||||||
|
return err // Return the initialization error
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Initialization successful, update agent state ---
|
||||||
|
a.mu.Lock() // Lock again to update agent state
|
||||||
|
// Double-check if another goroutine initialized successfully in the meantime
|
||||||
|
// Although unlikely with the outer lock/check, it's safer.
|
||||||
|
if a.client != nil {
|
||||||
|
a.mu.Unlock()
|
||||||
|
log.Warn(ctx, "MCP client was already initialized by another routine, discarding this attempt", "pid", currentPid)
|
||||||
|
// Kill the redundant process we started
|
||||||
|
if killErr := cmd.Process.Kill(); killErr != nil {
|
||||||
|
log.Error(ctx, "Failed to kill redundant MCP server process", "pid", currentPid, "error", killErr)
|
||||||
|
}
|
||||||
|
return nil // Return success as *a* client is available
|
||||||
|
}
|
||||||
|
|
||||||
|
a.cmd = cmd // Store the successfully started command
|
||||||
|
a.stdin = stdin // Store its stdin
|
||||||
|
a.client = client // Store the successfully initialized client
|
||||||
|
a.mu.Unlock()
|
||||||
|
|
||||||
|
log.Info(ctx, "MCP client initialized successfully", "pid", currentPid)
|
||||||
|
return nil // Success
|
||||||
}
|
}
|
||||||
|
|
||||||
// getArtistBiographyArgs defines the structure for the MCP tool arguments.
|
// getArtistBiographyArgs defines the structure for the MCP tool arguments.
|
||||||
@ -144,23 +176,27 @@ type getArtistBiographyArgs struct {
|
|||||||
|
|
||||||
// GetArtistBiography retrieves the artist biography by calling the external MCP server.
|
// GetArtistBiography retrieves the artist biography by calling the external MCP server.
|
||||||
func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
|
func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
|
||||||
// Ensure the client is initialized and the server is running
|
// Ensure the client is initialized and the server is running (attempts restart if needed)
|
||||||
if err := a.ensureClientInitialized(ctx); err != nil {
|
if err := a.ensureClientInitialized(ctx); err != nil {
|
||||||
log.Error(ctx, "MCP client not initialized, cannot get biography", "error", err)
|
log.Error(ctx, "MCP agent initialization/restart failed, cannot get biography", "error", err)
|
||||||
// Don't wrap the initErr, return it directly so callers see the original init problem
|
return "", fmt.Errorf("MCP agent not ready: %w", err)
|
||||||
return "", fmt.Errorf("MCP agent not initialized: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock to ensure only one request uses the client/pipes at a time
|
// Lock to ensure only one request uses the client/pipes at a time
|
||||||
a.mu.Lock()
|
a.mu.Lock()
|
||||||
defer a.mu.Unlock()
|
|
||||||
|
|
||||||
// Check if the client is still valid (it might have been nilled if the process died)
|
// Check if the client is still valid *after* ensuring initialization and acquiring lock.
|
||||||
|
// The monitoring goroutine could have nilled it out if the process died just now.
|
||||||
if a.client == nil {
|
if a.client == nil {
|
||||||
log.Error(ctx, "MCP client is not valid (server process likely died)")
|
a.mu.Unlock() // Release lock before returning error
|
||||||
|
log.Error(ctx, "MCP client became invalid after initialization check (server process likely died)")
|
||||||
return "", fmt.Errorf("MCP agent process is not running")
|
return "", fmt.Errorf("MCP agent process is not running")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Keep a reference to the client while locked
|
||||||
|
currentClient := a.client
|
||||||
|
a.mu.Unlock() // Release lock before making the potentially blocking MCP call
|
||||||
|
|
||||||
log.Debug(ctx, "Calling MCP agent GetArtistBiography", "id", id, "name", name, "mbid", mbid)
|
log.Debug(ctx, "Calling MCP agent GetArtistBiography", "id", id, "name", name, "mbid", mbid)
|
||||||
|
|
||||||
// Prepare arguments for the tool call
|
// Prepare arguments for the tool call
|
||||||
@ -170,17 +206,16 @@ func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string
|
|||||||
Mbid: mbid,
|
Mbid: mbid,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call the tool using the persistent client
|
// Call the tool using the client reference
|
||||||
log.Debug(ctx, "Calling MCP tool", "tool", mcpToolName, "args", args)
|
log.Debug(ctx, "Calling MCP tool", "tool", mcpToolName, "args", args)
|
||||||
response, err := a.client.CallTool(ctx, mcpToolName, args)
|
response, err := currentClient.CallTool(ctx, mcpToolName, args) // Use currentClient
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Handle potential pipe closures or other communication errors
|
// Handle potential pipe closures or other communication errors
|
||||||
log.Error(ctx, "Failed to call MCP tool", "tool", mcpToolName, "error", err)
|
log.Error(ctx, "Failed to call MCP tool", "tool", mcpToolName, "error", err)
|
||||||
// Check if the error indicates a broken pipe, suggesting the server died
|
// Check if the error indicates a broken pipe, suggesting the server died
|
||||||
if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "EOF") {
|
if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "EOF") {
|
||||||
log.Warn(ctx, "MCP tool call failed, possibly due to server process exit")
|
log.Warn(ctx, "MCP tool call failed, possibly due to server process exit. State will be reset.")
|
||||||
// Attempt to reset for next time? Or just fail?
|
// State reset is handled by the monitoring goroutine, just return error
|
||||||
// For now, just return a clear error.
|
|
||||||
return "", fmt.Errorf("MCP agent process communication error: %w", err)
|
return "", fmt.Errorf("MCP agent process communication error: %w", err)
|
||||||
}
|
}
|
||||||
return "", fmt.Errorf("failed to call MCP tool '%s': %w", mcpToolName, err)
|
return "", fmt.Errorf("failed to call MCP tool '%s': %w", mcpToolName, err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user