diff --git a/core/agents/mcp/mcp_agent.go b/core/agents/mcp/mcp_agent.go index 31c67422e..06398f80a 100644 --- a/core/agents/mcp/mcp_agent.go +++ b/core/agents/mcp/mcp_agent.go @@ -2,10 +2,14 @@ package mcp import ( "context" + "errors" "fmt" + "io" "os" "os/exec" "strings" + "sync" + "time" mcp "github.com/metoro-io/mcp-golang" "github.com/metoro-io/mcp-golang/transport/stdio" @@ -23,8 +27,15 @@ const ( ) // MCPAgent interacts with an external MCP server for metadata retrieval. +// It keeps a single instance of the server process running. type MCPAgent struct { - // ds model.DataStore // Not needed for this PoC + mu sync.Mutex + initOnce sync.Once + initErr error // Stores initialization error + + cmd *exec.Cmd + stdin io.WriteCloser + client *mcp.Client } func mcpConstructor(ds model.DataStore) agents.Interface { @@ -33,7 +44,8 @@ func mcpConstructor(ds model.DataStore) agents.Interface { log.Warn("MCP server executable not found, disabling agent", "path", mcpServerPath, "error", err) return nil } - log.Info("MCP Agent initialized", "serverPath", mcpServerPath) + log.Info("MCP Agent created, server will be started on first request", "serverPath", mcpServerPath) + // Initialize the struct, but don't start the process yet. return &MCPAgent{} } @@ -41,6 +53,86 @@ func (a *MCPAgent) AgentName() string { return mcpAgentName } +// ensureClientInitialized starts the MCP server process and initializes the client exactly once. +func (a *MCPAgent) ensureClientInitialized(ctx context.Context) error { + a.initOnce.Do(func() { + log.Info(ctx, "Initializing MCP client and starting server process...", "serverPath", mcpServerPath) + // Use background context for the command itself, so it doesn't get cancelled by the request context. + // We manage its lifecycle independently. + cmd := exec.CommandContext(context.Background(), mcpServerPath) + + stdin, err := cmd.StdinPipe() + if err != nil { + a.initErr = fmt.Errorf("failed to get stdin pipe for MCP server: %w", err) + log.Error(ctx, "MCP init failed", "error", a.initErr) + return + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + a.initErr = fmt.Errorf("failed to get stdout pipe for MCP server: %w", err) + _ = stdin.Close() // Clean up stdin pipe + log.Error(ctx, "MCP init failed", "error", a.initErr) + return + } + + // Capture stderr for debugging + var stderr strings.Builder + cmd.Stderr = &stderr + + if err := cmd.Start(); err != nil { + a.initErr = fmt.Errorf("failed to start MCP server process: %w", err) + _ = stdin.Close() + _ = stdout.Close() + log.Error(ctx, "MCP init failed", "error", a.initErr) + return + } + + a.cmd = cmd + a.stdin = stdin + log.Info(ctx, "MCP server process started", "pid", cmd.Process.Pid) + + // Start a goroutine to wait for the process and log if it exits unexpectedly + go func() { + err := cmd.Wait() + log.Warn("MCP server process exited", "pid", cmd.Process.Pid, "error", err, "stderr", stderr.String()) + // Maybe add logic here to attempt restart or mark the agent as unhealthy? + // For PoC, just log it. + // Ensure pipes are closed if the process exits + a.mu.Lock() + if a.stdin != nil { + _ = a.stdin.Close() + a.stdin = nil + } + // stdout is typically closed by the StdioServerTransport when done reading. + a.client = nil // Mark client as unusable + a.cmd = nil + a.mu.Unlock() + }() + + // Create and initialize the MCP client + transport := stdio.NewStdioServerTransportWithIO(stdout, stdin) + client := mcp.NewClient(transport) + + // Use a specific context for initialization, not the potentially short-lived request context + initCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Example timeout + defer cancel() + if _, err := client.Initialize(initCtx); err != nil { + a.initErr = fmt.Errorf("failed to initialize MCP client: %w", err) + log.Error(ctx, "MCP client initialization failed", "error", a.initErr) + // Attempt to kill the process we just started + if killErr := cmd.Process.Kill(); killErr != nil { + log.Error(ctx, "Failed to kill MCP server process after init failure", "pid", cmd.Process.Pid, "error", killErr) + } + return + } + + a.client = client + log.Info(ctx, "MCP client initialized successfully") + }) + return a.initErr +} + // getArtistBiographyArgs defines the structure for the MCP tool arguments. // IMPORTANT: Field names MUST be exported (start with uppercase) for JSON marshalling, // but the `json` tags determine the actual field names sent over MCP. @@ -52,56 +144,25 @@ type getArtistBiographyArgs struct { // GetArtistBiography retrieves the artist biography by calling the external MCP server. func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) { + // Ensure the client is initialized and the server is running + if err := a.ensureClientInitialized(ctx); err != nil { + log.Error(ctx, "MCP client not initialized, cannot get biography", "error", err) + // Don't wrap the initErr, return it directly so callers see the original init problem + return "", fmt.Errorf("MCP agent not initialized: %w", err) + } + + // Lock to ensure only one request uses the client/pipes at a time + a.mu.Lock() + defer a.mu.Unlock() + + // Check if the client is still valid (it might have been nilled if the process died) + if a.client == nil { + log.Error(ctx, "MCP client is not valid (server process likely died)") + return "", fmt.Errorf("MCP agent process is not running") + } + log.Debug(ctx, "Calling MCP agent GetArtistBiography", "id", id, "name", name, "mbid", mbid) - // Prepare command with context for cancellation handling - cmd := exec.CommandContext(ctx, mcpServerPath) - - // Get pipes for stdin and stdout - stdin, err := cmd.StdinPipe() - if err != nil { - log.Error(ctx, "Failed to get stdin pipe for MCP server", "error", err) - return "", fmt.Errorf("failed to get stdin pipe: %w", err) - } - stdout, err := cmd.StdoutPipe() - if err != nil { - log.Error(ctx, "Failed to get stdout pipe for MCP server", "error", err) - return "", fmt.Errorf("failed to get stdout pipe: %w", err) - } - // Capture stderr for debugging, but don't block on it - var stderr strings.Builder - cmd.Stderr = &stderr - - // Start the MCP server process - if err := cmd.Start(); err != nil { - log.Error(ctx, "Failed to start MCP server process", "path", mcpServerPath, "error", err) - return "", fmt.Errorf("failed to start MCP server: %w", err) - } - // Ensure the process is killed eventually - defer func() { - if err := cmd.Process.Kill(); err != nil { - log.Error(ctx, "Failed to kill MCP server process", "pid", cmd.Process.Pid, "error", err) - } else { - log.Debug(ctx, "Killed MCP server process", "pid", cmd.Process.Pid) - } - // Log stderr if it contains anything - if stderr.Len() > 0 { - log.Warn(ctx, "MCP server stderr output", "pid", cmd.Process.Pid, "stderr", stderr.String()) - } - }() - - log.Debug(ctx, "MCP server process started", "pid", cmd.Process.Pid) - - // Create and initialize the MCP client - transport := stdio.NewStdioServerTransportWithIO(stdout, stdin) - client := mcp.NewClient(transport) - - if _, err := client.Initialize(ctx); err != nil { - log.Error(ctx, "Failed to initialize MCP client", "error", err) - return "", fmt.Errorf("failed to initialize MCP client: %w", err) - } - log.Debug(ctx, "MCP client initialized") - // Prepare arguments for the tool call args := getArtistBiographyArgs{ ID: id, @@ -109,18 +170,26 @@ func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string Mbid: mbid, } - // Call the tool + // Call the tool using the persistent client log.Debug(ctx, "Calling MCP tool", "tool", mcpToolName, "args", args) - response, err := client.CallTool(ctx, mcpToolName, args) + response, err := a.client.CallTool(ctx, mcpToolName, args) if err != nil { + // Handle potential pipe closures or other communication errors log.Error(ctx, "Failed to call MCP tool", "tool", mcpToolName, "error", err) + // Check if the error indicates a broken pipe, suggesting the server died + if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "EOF") { + log.Warn(ctx, "MCP tool call failed, possibly due to server process exit") + // Attempt to reset for next time? Or just fail? + // For now, just return a clear error. + return "", fmt.Errorf("MCP agent process communication error: %w", err) + } return "", fmt.Errorf("failed to call MCP tool '%s': %w", mcpToolName, err) } // Process the response if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil { log.Warn(ctx, "MCP tool returned empty or invalid response", "tool", mcpToolName) - return "", agents.ErrNotFound // Or a more specific error? + return "", agents.ErrNotFound } bio := response.Content[0].TextContent.Text