diff --git a/core/agents/mcp/mcp_agent.go b/core/agents/mcp/mcp_agent.go index 06398f80a..8322c7bc8 100644 --- a/core/agents/mcp/mcp_agent.go +++ b/core/agents/mcp/mcp_agent.go @@ -24,14 +24,16 @@ const ( // Hardcoded path for PoC mcpServerPath = "/Users/deluan/Development/navidrome/plugins-mcp/mcp-server" mcpToolName = "get_artist_biography" + // TODO: Add configuration for restart delays + initializationTimeout = 10 * time.Second ) // MCPAgent interacts with an external MCP server for metadata retrieval. -// It keeps a single instance of the server process running. +// It keeps a single instance of the server process running and attempts restart on failure. type MCPAgent struct { - mu sync.Mutex - initOnce sync.Once - initErr error // Stores initialization error + mu sync.Mutex + // No longer using sync.Once to allow re-initialization + // initErr error // Error state managed implicitly by client being nil cmd *exec.Cmd stdin io.WriteCloser @@ -45,7 +47,6 @@ func mcpConstructor(ds model.DataStore) agents.Interface { return nil } log.Info("MCP Agent created, server will be started on first request", "serverPath", mcpServerPath) - // Initialize the struct, but don't start the process yet. return &MCPAgent{} } @@ -53,84 +54,115 @@ func (a *MCPAgent) AgentName() string { return mcpAgentName } -// ensureClientInitialized starts the MCP server process and initializes the client exactly once. -func (a *MCPAgent) ensureClientInitialized(ctx context.Context) error { - a.initOnce.Do(func() { - log.Info(ctx, "Initializing MCP client and starting server process...", "serverPath", mcpServerPath) - // Use background context for the command itself, so it doesn't get cancelled by the request context. - // We manage its lifecycle independently. - cmd := exec.CommandContext(context.Background(), mcpServerPath) +// ensureClientInitialized starts the MCP server process and initializes the client if needed. +// It now attempts restart if the client is found to be nil. +func (a *MCPAgent) ensureClientInitialized(ctx context.Context) (err error) { + a.mu.Lock() + // If client is already initialized and valid, we're done. + if a.client != nil { + a.mu.Unlock() + return nil + } + // Unlock after the check, as the rest of the function needs the lock. + a.mu.Unlock() - stdin, err := cmd.StdinPipe() - if err != nil { - a.initErr = fmt.Errorf("failed to get stdin pipe for MCP server: %w", err) - log.Error(ctx, "MCP init failed", "error", a.initErr) - return - } + // --- Attempt initialization/restart --- + log.Info(ctx, "Initializing MCP client and starting/restarting server process...", "serverPath", mcpServerPath) - stdout, err := cmd.StdoutPipe() - if err != nil { - a.initErr = fmt.Errorf("failed to get stdout pipe for MCP server: %w", err) - _ = stdin.Close() // Clean up stdin pipe - log.Error(ctx, "MCP init failed", "error", a.initErr) - return - } + // Use background context for the command itself, so it doesn't get cancelled by the request context. + cmd := exec.CommandContext(context.Background(), mcpServerPath) - // Capture stderr for debugging - var stderr strings.Builder - cmd.Stderr = &stderr + var stdin io.WriteCloser + var stdout io.ReadCloser + var stderr strings.Builder - if err := cmd.Start(); err != nil { - a.initErr = fmt.Errorf("failed to start MCP server process: %w", err) - _ = stdin.Close() - _ = stdout.Close() - log.Error(ctx, "MCP init failed", "error", a.initErr) - return - } + stdin, err = cmd.StdinPipe() + if err != nil { + err = fmt.Errorf("failed to get stdin pipe for MCP server: %w", err) + log.Error(ctx, "MCP init/restart failed", "error", err) + return err // Return error directly + } - a.cmd = cmd - a.stdin = stdin - log.Info(ctx, "MCP server process started", "pid", cmd.Process.Pid) + stdout, err = cmd.StdoutPipe() + if err != nil { + err = fmt.Errorf("failed to get stdout pipe for MCP server: %w", err) + _ = stdin.Close() // Clean up stdin pipe + log.Error(ctx, "MCP init/restart failed", "error", err) + return err + } - // Start a goroutine to wait for the process and log if it exits unexpectedly - go func() { - err := cmd.Wait() - log.Warn("MCP server process exited", "pid", cmd.Process.Pid, "error", err, "stderr", stderr.String()) - // Maybe add logic here to attempt restart or mark the agent as unhealthy? - // For PoC, just log it. - // Ensure pipes are closed if the process exits - a.mu.Lock() + cmd.Stderr = &stderr + + if err = cmd.Start(); err != nil { + err = fmt.Errorf("failed to start MCP server process: %w", err) + _ = stdin.Close() + _ = stdout.Close() + log.Error(ctx, "MCP init/restart failed", "error", err) + return err + } + + currentPid := cmd.Process.Pid + log.Info(ctx, "MCP server process started/restarted", "pid", currentPid) + + // --- Start monitoring goroutine for *this* process --- + go func(processCmd *exec.Cmd, processStderr *strings.Builder, processPid int) { + waitErr := processCmd.Wait() + // Lock immediately after Wait returns to update state atomically + a.mu.Lock() + log.Warn("MCP server process exited", "pid", processPid, "error", waitErr, "stderr", processStderr.String()) + // Clean up state only if this is still the *current* process + // (to avoid race condition if a quick restart happened) + if a.cmd == processCmd { if a.stdin != nil { _ = a.stdin.Close() a.stdin = nil } - // stdout is typically closed by the StdioServerTransport when done reading. - a.client = nil // Mark client as unusable + a.client = nil // Mark client as unusable, triggering restart on next call a.cmd = nil - a.mu.Unlock() - }() - - // Create and initialize the MCP client - transport := stdio.NewStdioServerTransportWithIO(stdout, stdin) - client := mcp.NewClient(transport) - - // Use a specific context for initialization, not the potentially short-lived request context - initCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Example timeout - defer cancel() - if _, err := client.Initialize(initCtx); err != nil { - a.initErr = fmt.Errorf("failed to initialize MCP client: %w", err) - log.Error(ctx, "MCP client initialization failed", "error", a.initErr) - // Attempt to kill the process we just started - if killErr := cmd.Process.Kill(); killErr != nil { - log.Error(ctx, "Failed to kill MCP server process after init failure", "pid", cmd.Process.Pid, "error", killErr) - } - return + log.Info("MCP agent state cleaned up after process exit", "pid", processPid) + } else { + log.Debug("MCP agent process exited, but state already updated by newer process", "exitedPid", processPid) } + a.mu.Unlock() + }(cmd, &stderr, currentPid) // Pass copies/values to the goroutine - a.client = client - log.Info(ctx, "MCP client initialized successfully") - }) - return a.initErr + // --- Initialize MCP client --- + transport := stdio.NewStdioServerTransportWithIO(stdout, stdin) // Use the pipes from this attempt + client := mcp.NewClient(transport) + + initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout) + defer cancel() + if _, err = client.Initialize(initCtx); err != nil { + err = fmt.Errorf("failed to initialize MCP client: %w", err) + log.Error(ctx, "MCP client initialization failed after process start", "pid", currentPid, "error", err) + // Attempt to kill the process we just started, as client init failed + if killErr := cmd.Process.Kill(); killErr != nil { + log.Error(ctx, "Failed to kill MCP server process after init failure", "pid", currentPid, "error", killErr) + } + return err // Return the initialization error + } + + // --- Initialization successful, update agent state --- + a.mu.Lock() // Lock again to update agent state + // Double-check if another goroutine initialized successfully in the meantime + // Although unlikely with the outer lock/check, it's safer. + if a.client != nil { + a.mu.Unlock() + log.Warn(ctx, "MCP client was already initialized by another routine, discarding this attempt", "pid", currentPid) + // Kill the redundant process we started + if killErr := cmd.Process.Kill(); killErr != nil { + log.Error(ctx, "Failed to kill redundant MCP server process", "pid", currentPid, "error", killErr) + } + return nil // Return success as *a* client is available + } + + a.cmd = cmd // Store the successfully started command + a.stdin = stdin // Store its stdin + a.client = client // Store the successfully initialized client + a.mu.Unlock() + + log.Info(ctx, "MCP client initialized successfully", "pid", currentPid) + return nil // Success } // getArtistBiographyArgs defines the structure for the MCP tool arguments. @@ -144,23 +176,27 @@ type getArtistBiographyArgs struct { // GetArtistBiography retrieves the artist biography by calling the external MCP server. func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) { - // Ensure the client is initialized and the server is running + // Ensure the client is initialized and the server is running (attempts restart if needed) if err := a.ensureClientInitialized(ctx); err != nil { - log.Error(ctx, "MCP client not initialized, cannot get biography", "error", err) - // Don't wrap the initErr, return it directly so callers see the original init problem - return "", fmt.Errorf("MCP agent not initialized: %w", err) + log.Error(ctx, "MCP agent initialization/restart failed, cannot get biography", "error", err) + return "", fmt.Errorf("MCP agent not ready: %w", err) } // Lock to ensure only one request uses the client/pipes at a time a.mu.Lock() - defer a.mu.Unlock() - // Check if the client is still valid (it might have been nilled if the process died) + // Check if the client is still valid *after* ensuring initialization and acquiring lock. + // The monitoring goroutine could have nilled it out if the process died just now. if a.client == nil { - log.Error(ctx, "MCP client is not valid (server process likely died)") + a.mu.Unlock() // Release lock before returning error + log.Error(ctx, "MCP client became invalid after initialization check (server process likely died)") return "", fmt.Errorf("MCP agent process is not running") } + // Keep a reference to the client while locked + currentClient := a.client + a.mu.Unlock() // Release lock before making the potentially blocking MCP call + log.Debug(ctx, "Calling MCP agent GetArtistBiography", "id", id, "name", name, "mbid", mbid) // Prepare arguments for the tool call @@ -170,17 +206,16 @@ func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string Mbid: mbid, } - // Call the tool using the persistent client + // Call the tool using the client reference log.Debug(ctx, "Calling MCP tool", "tool", mcpToolName, "args", args) - response, err := a.client.CallTool(ctx, mcpToolName, args) + response, err := currentClient.CallTool(ctx, mcpToolName, args) // Use currentClient if err != nil { // Handle potential pipe closures or other communication errors log.Error(ctx, "Failed to call MCP tool", "tool", mcpToolName, "error", err) // Check if the error indicates a broken pipe, suggesting the server died if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "EOF") { - log.Warn(ctx, "MCP tool call failed, possibly due to server process exit") - // Attempt to reset for next time? Or just fail? - // For now, just return a clear error. + log.Warn(ctx, "MCP tool call failed, possibly due to server process exit. State will be reset.") + // State reset is handled by the monitoring goroutine, just return error return "", fmt.Errorf("MCP agent process communication error: %w", err) } return "", fmt.Errorf("failed to call MCP tool '%s': %w", mcpToolName, err)