diff --git a/core/agents/mcp/mcp_agent.go b/core/agents/mcp/mcp_agent.go index d7f6eca3e..e1b5203ae 100644 --- a/core/agents/mcp/mcp_agent.go +++ b/core/agents/mcp/mcp_agent.go @@ -3,19 +3,15 @@ package mcp import ( "context" "errors" - "fmt" - "io" "os" - "os/exec" "path/filepath" "strings" - "sync" "time" - mcp "github.com/metoro-io/mcp-golang" - "github.com/metoro-io/mcp-golang/transport/stdio" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" + mcp "github.com/metoro-io/mcp-golang" // Needed for mcpClient interface types + "github.com/tetratelabs/wazero" // Needed for constructor + + // Needed for constructor types (api.Closer) "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" "github.com/navidrome/navidrome/conf" @@ -27,482 +23,176 @@ import ( // Exported constants for testing const ( McpAgentName = "mcp" - McpServerPath = "./core/agents/mcp/mcp-server/mcp-server.wasm" - McpToolNameGetBio = "get_artist_biography" - McpToolNameGetURL = "get_artist_url" - initializationTimeout = 10 * time.Second + initializationTimeout = 5 * time.Second + // McpServerPath defines the location of the MCP server executable or WASM module. + McpServerPath = "./core/agents/mcp/mcp-server/mcp-server.wasm" + McpToolNameGetBio = "get_artist_biography" + McpToolNameGetURL = "get_artist_url" ) // mcpClient interface matching the methods used from mcp.Client -// Allows for mocking in tests. +// Needs to be defined here as it's used by implementations type mcpClient interface { Initialize(ctx context.Context) (*mcp.InitializeResponse, error) CallTool(ctx context.Context, toolName string, args any) (*mcp.ToolResponse, error) } -// MCPAgent interacts with an external MCP server for metadata retrieval. -// It keeps a single instance of the server process running and attempts restart on failure. -// Supports both native executables and WASM modules (via Wazero). -type MCPAgent struct { - mu sync.Mutex - - // Runtime state - stdin io.WriteCloser - client mcpClient - cmd *exec.Cmd // Stores the native process command - wasmModule api.Module // Stores the instantiated WASM module - - // Shared Wazero resources (created once) - wasmRuntime api.Closer // Shared Wazero Runtime (implements Close(context.Context)) - wasmCache wazero.CompilationCache // Shared Compilation Cache (implements Close(context.Context)) - - // WASM resources per instance (cleaned up by monitoring goroutine) - wasmCompiled api.Closer // Stores the compiled WASM module for closing - - // ClientOverride allows injecting a mock client for testing. - // This field should ONLY be set in test code. - ClientOverride mcpClient +// mcpImplementation defines the common interface for both native and WASM MCP agents. +// This allows the main MCPAgent to delegate calls without knowing the underlying type. +type mcpImplementation interface { + agents.ArtistBiographyRetriever + agents.ArtistURLRetriever + Close() error // For cleaning up resources associated with this specific implementation. } +// MCPAgent is the public-facing agent registered with Navidrome. +// It acts as a wrapper around the actual implementation (native or WASM). +type MCPAgent struct { + // No mutex needed here if impl is set once at construction + // and the implementation handles its own internal state synchronization. + impl mcpImplementation + + // We might need a way to close shared resources later, but for now, + // the agent lifecycle is managed by the constructor and the impl.Close(). +} + +// mcpConstructor creates the appropriate MCP implementation (native or WASM) +// and wraps it in the MCPAgent. func mcpConstructor(ds model.DataStore) agents.Interface { - // Check if the MCP server executable exists + // Check if the MCP server executable/WASM exists if _, err := os.Stat(McpServerPath); os.IsNotExist(err) { log.Warn("MCP server executable/WASM not found, disabling agent", "path", McpServerPath, "error", err) return nil } - a := &MCPAgent{} + var agentImpl mcpImplementation + var err error - // If it's a WASM path, pre-initialize the shared Wazero runtime, cache, and host functions. if strings.HasSuffix(McpServerPath, ".wasm") { + log.Info("Configuring MCP agent for WASM execution", "path", McpServerPath) ctx := context.Background() // Use background context for setup + + // --- Setup Shared Wazero Resources --- + var cache wazero.CompilationCache cacheDir := filepath.Join(conf.Server.DataFolder, "cache", "wazero") - if err := os.MkdirAll(cacheDir, 0755); err != nil { - log.Error(ctx, "Failed to create Wazero cache directory, WASM caching disabled", "path", cacheDir, "error", err) + if errMkdir := os.MkdirAll(cacheDir, 0755); errMkdir != nil { + log.Error(ctx, "Failed to create Wazero cache directory, WASM caching disabled", "path", cacheDir, "error", errMkdir) } else { - cache, err := wazero.NewCompilationCacheWithDir(cacheDir) + cache, err = wazero.NewCompilationCacheWithDir(cacheDir) if err != nil { log.Error(ctx, "Failed to create Wazero compilation cache, WASM caching disabled", "path", cacheDir, "error", err) - } else { - a.wasmCache = cache - log.Info(ctx, "Wazero compilation cache enabled", "path", cacheDir) + cache = nil // Ensure cache is nil if creation failed } } - // Create runtime config, adding cache if it was created successfully + // Create runtime config, adding cache if it exists runtimeConfig := wazero.NewRuntimeConfig() - if a.wasmCache != nil { - runtimeConfig = runtimeConfig.WithCompilationCache(a.wasmCache) + if cache != nil { + runtimeConfig = runtimeConfig.WithCompilationCache(cache) } // Create the shared runtime runtime := wazero.NewRuntimeWithConfig(ctx, runtimeConfig) - // --- Register Host Functions --- Must happen BEFORE WASI instantiation if WASI needs them? - // Actually, WASI instantiation is separate from host func instantiation. - if err := registerHostFunctions(ctx, runtime); err != nil { - // Error already logged by registerHostFunctions + // Register host functions + if err = registerHostFunctions(ctx, runtime); err != nil { _ = runtime.Close(ctx) - if a.wasmCache != nil { - _ = a.wasmCache.Close(ctx) + if cache != nil { + _ = cache.Close(ctx) } - return nil + return nil // Fatal error } - // --- End Host Function Registration --- - a.wasmRuntime = runtime // Store the runtime closer - - // Instantiate WASI onto the shared runtime. If this fails, the agent is unusable for WASM. - if _, err := wasi_snapshot_preview1.Instantiate(ctx, runtime); err != nil { + // Instantiate WASI + if _, err = wasi_snapshot_preview1.Instantiate(ctx, runtime); err != nil { log.Error(ctx, "Failed to instantiate WASI on shared Wazero runtime, MCP WASM agent disabled", "error", err) - // Close runtime and cache if WASI fails _ = runtime.Close(ctx) - if a.wasmCache != nil { - _ = a.wasmCache.Close(ctx) + if cache != nil { + _ = cache.Close(ctx) } - return nil // Cannot proceed if WASI fails + return nil // Fatal error } - log.Info(ctx, "Shared Wazero runtime, WASI, and host functions initialized for MCP agent") + // --- End Shared Resource Setup --- + + // Create the WASM-specific implementation + agentImpl = newMCPWasm(runtime, cache) // Pass shared resources + log.Info(ctx, "Shared Wazero runtime, WASI, cache, and host functions initialized for MCP agent") + + } else { + log.Info("Configuring MCP agent for native execution", "path", McpServerPath) + // Create the native-specific implementation + agentImpl = newMCPNative() } - log.Info("MCP Agent created, server will be started on first request", "serverPath", McpServerPath) - return a + // Return the wrapper agent + log.Info("MCP Agent implementation created successfully") + return &MCPAgent{impl: agentImpl} } +// NewAgentForTesting is a constructor specifically for tests. +// It creates the appropriate implementation based on McpServerPath +// and injects a mock mcpClient into its ClientOverride field. +func NewAgentForTesting(mockClient mcpClient) agents.Interface { + // We need to replicate the logic from mcpConstructor to determine + // the implementation type, but without actually starting processes. + + var agentImpl mcpImplementation + + if strings.HasSuffix(McpServerPath, ".wasm") { + // For WASM testing, we might not need the full runtime setup, + // just the struct. Pass nil for shared resources for now. + // We rely on the mockClient being used before any real WASM interaction. + wasmImpl := newMCPWasm(nil, nil) // Pass nil runtime/cache + wasmImpl.ClientOverride = mockClient + agentImpl = wasmImpl + } else { + nativeImpl := newMCPNative() + nativeImpl.ClientOverride = mockClient + agentImpl = nativeImpl + } + + return &MCPAgent{impl: agentImpl} +} + +// --- MCPAgent Method Implementations (Delegation) --- + func (a *MCPAgent) AgentName() string { return McpAgentName } -// cleanup closes existing resources (stdin, server process/module). -// MUST be called while holding the mutex. -func (a *MCPAgent) cleanup() { - log.Debug(context.Background(), "Cleaning up MCP agent instance resources...") - if a.stdin != nil { - _ = a.stdin.Close() - a.stdin = nil +func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) { + if a.impl == nil { + return "", errors.New("MCP agent implementation is nil") } - // Clean up native process if it exists - if a.cmd != nil && a.cmd.Process != nil { - log.Debug(context.Background(), "Killing native MCP process", "pid", a.cmd.Process.Pid) - if err := a.cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) { - log.Error(context.Background(), "Failed to kill native process", "pid", a.cmd.Process.Pid, "error", err) - } - // Wait might return an error if already killed/exited, ignore it. - _ = a.cmd.Wait() - a.cmd = nil - } - // Clean up WASM module instance if it exists - if a.wasmModule != nil { - log.Debug(context.Background(), "Closing WASM module instance") - ctxClose, cancel := context.WithTimeout(context.Background(), 2*time.Second) - if err := a.wasmModule.Close(ctxClose); err != nil { - // Ignore context deadline exceeded as it means close was successful but slow - if !errors.Is(err, context.DeadlineExceeded) { - log.Error(context.Background(), "Failed to close WASM module instance", "error", err) - } - } - cancel() - a.wasmModule = nil - } - // Clean up compiled module ref for this instance - if a.wasmCompiled != nil { - log.Debug(context.Background(), "Closing compiled WASM module ref") - // Use background context, Close should be quick - if err := a.wasmCompiled.Close(context.Background()); err != nil { - log.Error(context.Background(), "Failed to close compiled WASM module ref", "error", err) - } - a.wasmCompiled = nil - } - - // DO NOT close shared wasmRuntime or wasmCache here. - - // Mark client as invalid - a.client = nil + return a.impl.GetArtistBiography(ctx, id, name, mbid) } -// ensureClientInitialized starts the MCP server process (native or WASM) -// and initializes the client if needed. Attempts restart on failure. -func (a *MCPAgent) ensureClientInitialized(ctx context.Context) (err error) { - // --- Use override if provided (for testing) --- - if a.ClientOverride != nil { - a.mu.Lock() - if a.client == nil { - a.client = a.ClientOverride - log.Debug(ctx, "Using provided MCP client override for testing") - } - a.mu.Unlock() - return nil +func (a *MCPAgent) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) { + if a.impl == nil { + return "", errors.New("MCP agent implementation is nil") } - - // --- Check if already initialized (critical section) --- - a.mu.Lock() - if a.client != nil { - a.mu.Unlock() - return nil - } - - // --- Client is nil, proceed with initialization *while holding the lock* --- - defer a.mu.Unlock() - - log.Info(ctx, "Initializing MCP client and starting/restarting server process...", "serverPath", McpServerPath) - - // Clean up any old resources *before* starting new ones - a.cleanup() - - var hostStdinWriter io.WriteCloser - var hostStdoutReader io.ReadCloser - var startErr error - var isWasm bool - - if strings.HasSuffix(McpServerPath, ".wasm") { - isWasm = true - // Check if shared runtime exists (it should if constructor succeeded for WASM) - if a.wasmRuntime == nil { - startErr = errors.New("shared Wazero runtime not initialized") - } else { - var mod api.Module - var compiled api.Closer // Store compiled module ref per instance - hostStdinWriter, hostStdoutReader, mod, compiled, startErr = a.startWasmModule(ctx) - if startErr == nil { - a.wasmModule = mod - a.wasmCompiled = compiled // Store compiled ref for cleanup - } else { - // Ensure potential partial resources from startWasmModule are closed on error - // startWasmModule's deferred cleanup should handle pipes and compiled module. - // Mod instance might need closing if instantiation partially succeeded before erroring. - if mod != nil { - _ = mod.Close(ctx) - } - // Do not close shared runtime here - } - } - } else { - isWasm = false - var nativeCmd *exec.Cmd - hostStdinWriter, hostStdoutReader, nativeCmd, startErr = a.startNativeProcess(ctx) - if startErr == nil { - a.cmd = nativeCmd - } - } - - if startErr != nil { - log.Error(ctx, "Failed to start MCP server process/module", "isWasm", isWasm, "error", startErr) - // Ensure pipes are closed if start failed (start functions might have deferred closes, but belt-and-suspenders) - if hostStdinWriter != nil { - _ = hostStdinWriter.Close() - } - if hostStdoutReader != nil { - _ = hostStdoutReader.Close() - } - // a.cleanup() was already called, specific resources (cmd/wasmModule) are nil - return fmt.Errorf("failed to start MCP server: %w", startErr) - } - - // --- Initialize MCP client --- (Ensure stdio transport import) - transport := stdio.NewStdioServerTransportWithIO(hostStdoutReader, hostStdinWriter) - clientImpl := mcp.NewClient(transport) - - initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout) - defer cancel() - if _, initErr := clientImpl.Initialize(initCtx); initErr != nil { - err = fmt.Errorf("failed to initialize MCP client: %w", initErr) - log.Error(ctx, "MCP client initialization failed after process/module start", "isWasm", isWasm, "error", err) - // Cleanup the newly started process/module and pipes as init failed - a.cleanup() // This should handle cmd/wasmModule - // Close the pipes directly as cleanup() doesn't know about them - if hostStdinWriter != nil { - _ = hostStdinWriter.Close() - } - if hostStdoutReader != nil { - _ = hostStdoutReader.Close() - } - return err // defer mu.Unlock() will run - } - - // --- Initialization successful, update agent state (still holding lock) --- - a.stdin = hostStdinWriter // This is the pipe the agent writes to - a.client = clientImpl - // cmd or wasmModule/Runtime/Compiled are already set by the start helpers - - log.Info(ctx, "MCP client initialized successfully", "isWasm", isWasm) - // defer mu.Unlock() runs here - return nil // Success + return a.impl.GetArtistURL(ctx, id, name, mbid) } -// startNativeProcess was moved to mcp_process_native.go - -// startWasmModule loads and starts the MCP server as a WASM module using the agent's shared Wazero runtime. -func (a *MCPAgent) startWasmModule(ctx context.Context) (hostStdinWriter io.WriteCloser, hostStdoutReader io.ReadCloser, mod api.Module, compiled api.Closer, err error) { - log.Debug(ctx, "Loading WASM MCP server module", "path", McpServerPath) - wasmBytes, err := os.ReadFile(McpServerPath) - if err != nil { - return nil, nil, nil, nil, fmt.Errorf("read wasm file: %w", err) - } - - // Create pipes for stdio redirection - wasmStdinReader, hostStdinWriter, err := os.Pipe() - if err != nil { - return nil, nil, nil, nil, fmt.Errorf("wasm stdin pipe: %w", err) - } - // Defer close pipes on error exit - defer func() { - if err != nil { - _ = wasmStdinReader.Close() - _ = hostStdinWriter.Close() - // hostStdoutReader and wasmStdoutWriter handled below - } - }() - - hostStdoutReader, wasmStdoutWriter, err := os.Pipe() - if err != nil { - _ = wasmStdinReader.Close() // Close previous pipe - _ = hostStdinWriter.Close() // Close previous pipe - return nil, nil, nil, nil, fmt.Errorf("wasm stdout pipe: %w", err) - } - // Defer close pipes on error exit - defer func() { - if err != nil { - _ = hostStdoutReader.Close() - _ = wasmStdoutWriter.Close() - } - }() - - // Use the SHARDED runtime from the agent struct - runtime, ok := a.wasmRuntime.(wazero.Runtime) - if !ok || runtime == nil { - return nil, nil, nil, nil, errors.New("wasmRuntime is not initialized or not a wazero.Runtime") - } - // WASI is already instantiated on the shared runtime - - config := wazero.NewModuleConfig(). - WithStdin(wasmStdinReader). - WithStdout(wasmStdoutWriter). - WithStderr(os.Stderr). - WithArgs(McpServerPath). - // Grant access to the host filesystem. Needed for DNS lookup (/etc/resolv.conf) - // and potentially other operations depending on the module. - // SECURITY: This grants broad access; consider more restricted FS if needed. - WithFS(os.DirFS("/")) - - log.Debug(ctx, "Compiling WASM module (using cache if enabled)...") - // Compile module using the shared runtime (which uses the configured cache) - compiledModule, err := runtime.CompileModule(ctx, wasmBytes) - if err != nil { - return nil, nil, nil, nil, fmt.Errorf("compile wasm module: %w", err) - } - // Defer closing compiled module in case of errors later in this function. - // Caller (ensureClientInitialized) is responsible for closing on success. - shouldCloseCompiledOnError := true - defer func() { - if shouldCloseCompiledOnError && compiledModule != nil { - _ = compiledModule.Close(context.Background()) - } - }() - - log.Info(ctx, "Instantiating WASM module (will run _start)...") - var instance api.Module - instanceErrChan := make(chan error, 1) - go func() { - var instantiateErr error - // Use context.Background() for the module's main execution context - instance, instantiateErr = runtime.InstantiateModule(context.Background(), compiledModule, config) - instanceErrChan <- instantiateErr - }() - - // Wait briefly for immediate instantiation errors - select { - case instantiateErr := <-instanceErrChan: - if instantiateErr != nil { - log.Error(ctx, "Failed to instantiate WASM module", "error", instantiateErr) - // compiledModule closed by defer - // pipes closed by defer - return nil, nil, nil, nil, fmt.Errorf("instantiate wasm module: %w", instantiateErr) - } - // If instantiateErr is nil here, the module exited immediately without error. Log it. - log.Warn(ctx, "WASM module instantiation returned (exited?) immediately without error.") - // Proceed to start monitoring, but return the (already closed) instance - // Pipes will be closed by the successful return path. - case <-time.After(2 * time.Second): - log.Debug(ctx, "WASM module instantiation likely blocking (server running), proceeding...") - } - - // Start a monitoring goroutine for WASM module exit/error - go func(modToMonitor api.Module, compiledToClose api.Closer, errChan chan error) { - // This will block until the instance created by InstantiateModule exits or errors. - instantiateErr := <-errChan - - a.mu.Lock() - log.Warn("WASM module exited/errored", "error", instantiateErr) - // Check if the module currently stored in the agent is the one we were monitoring. - // Use the central cleanup which handles nil checks. - if a.wasmModule == modToMonitor { - a.cleanup() // This will close the module instance and compiled ref - log.Info("MCP agent state cleaned up after WASM module exit/error") - } else { - // This case can happen if cleanup was called manually or if a new instance - // was started before the old one finished exiting. - log.Debug("WASM module exited, but state already updated or module mismatch. Explicitly closing compiled ref if needed.") - // Manually close the compiled module ref associated with this specific instance - // as cleanup() won't if a.wasmModule doesn't match or is nil. - if compiledToClose != nil { - _ = compiledToClose.Close(context.Background()) - } - } - a.mu.Unlock() - }(instance, compiledModule, instanceErrChan) // Pass necessary refs - - // Success: prevent deferred cleanup of compiled module, return resources needed by caller - shouldCloseCompiledOnError = false - return hostStdinWriter, hostStdoutReader, instance, compiledModule, nil // Return instance and compiled module -} +// Note: A Close method on MCPAgent itself isn't part of agents.Interface. +// Cleanup of the specific implementation happens via impl.Close(). +// Cleanup of shared Wazero resources needs separate handling (e.g., on app shutdown). // ArtistArgs defines the structure for MCP tool arguments requiring artist info. -// Exported for use in tests. +// Keep it here as it's used by both implementations indirectly via callMCPTool. type ArtistArgs struct { ID string `json:"id"` Name string `json:"name"` Mbid string `json:"mbid,omitempty"` } -// callMCPTool is a helper to perform the common steps of calling an MCP tool. -func (a *MCPAgent) callMCPTool(ctx context.Context, toolName string, args any) (string, error) { - // Ensure the client is initialized and the server is running (attempts restart if needed) - if err := a.ensureClientInitialized(ctx); err != nil { - log.Error(ctx, "MCP agent initialization/restart failed, cannot call tool", "tool", toolName, "error", err) - return "", fmt.Errorf("MCP agent not ready: %w", err) - } - - // Lock to safely access the shared client resource - a.mu.Lock() - - // Check if the client is valid *after* ensuring initialization and acquiring lock. - if a.client == nil { - a.mu.Unlock() // Release lock before returning error - log.Error(ctx, "MCP client became invalid after initialization check (server process likely died)", "tool", toolName) - return "", fmt.Errorf("MCP agent process is not running") - } - - // Keep a reference to the client while locked - currentClient := a.client - a.mu.Unlock() // *Release lock before* making the potentially blocking MCP call - - // Call the tool using the client reference - log.Debug(ctx, "Calling MCP tool", "tool", toolName, "args", args) - response, err := currentClient.CallTool(ctx, toolName, args) - if err != nil { - // Handle potential pipe closures or other communication errors - log.Error(ctx, "Failed to call MCP tool", "tool", toolName, "error", err) - // Check if the error indicates a broken pipe, suggesting the server died - if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "EOF") { - log.Warn(ctx, "MCP tool call failed, possibly due to server process exit. State will be reset.", "tool", toolName) - // State reset is handled by the monitoring goroutine, just return error - return "", fmt.Errorf("MCP agent process communication error: %w", err) - } - return "", fmt.Errorf("failed to call MCP tool '%s': %w", toolName, err) - } - - // Process the response - if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil || response.Content[0].TextContent.Text == "" { - log.Warn(ctx, "MCP tool returned empty or invalid response structure", "tool", toolName) - return "", agents.ErrNotFound - } - - // Check if the returned text content itself indicates an error from the MCP tool - resultText := response.Content[0].TextContent.Text - if strings.HasPrefix(resultText, "handler returned an error:") { - log.Warn(ctx, "MCP tool returned an error message in its response", "tool", toolName, "mcpError", resultText) - return "", agents.ErrNotFound // Treat MCP tool errors as "not found" - } - - // Return the successful text content - log.Debug(ctx, "Received response from MCP agent", "tool", toolName, "length", len(resultText)) - return resultText, nil -} - -// GetArtistBiography retrieves the artist biography by calling the external MCP server. -func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) { - args := ArtistArgs{ - ID: id, - Name: name, - Mbid: mbid, - } - return a.callMCPTool(ctx, McpToolNameGetBio, args) -} - -// GetArtistURL retrieves the artist URL by calling the external MCP server. -func (a *MCPAgent) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) { - args := ArtistArgs{ - ID: id, - Name: name, - Mbid: mbid, - } - return a.callMCPTool(ctx, McpToolNameGetURL, args) -} - -// Ensure MCPAgent implements the required interfaces +// Ensure MCPAgent still fulfills the registration requirements indirectly var _ agents.ArtistBiographyRetriever = (*MCPAgent)(nil) var _ agents.ArtistURLRetriever = (*MCPAgent)(nil) func init() { + // Register the real constructor, not the test one agents.Register(McpAgentName, mcpConstructor) } + +// Core logic moved to implementations. diff --git a/core/agents/mcp/mcp_agent_test.go b/core/agents/mcp/mcp_agent_test.go index c1f3d0e09..d64fea2d6 100644 --- a/core/agents/mcp/mcp_agent_test.go +++ b/core/agents/mcp/mcp_agent_test.go @@ -13,8 +13,11 @@ import ( . "github.com/onsi/gomega" ) -// mcpClient defines the interface for the MCP client methods used by the agent. -// This allows mocking the client for testing. +// Use the exported mcpClient interface from the mcp package +// type internalMCPClient = mcp.InternalMCPClient // Assuming you export the interface // REMOVE THIS LINE + +// Define the mcpClient interface locally for mocking, matching the one +// used internally by MCPNative/MCPWasm. type mcpClient interface { Initialize(ctx context.Context) (*mcp_client.InitializeResponse, error) CallTool(ctx context.Context, toolName string, args any) (*mcp_client.ToolResponse, error) @@ -44,13 +47,15 @@ func (m *mockMCPClient) CallTool(ctx context.Context, toolName string, args any) return &mcp_client.ToolResponse{}, nil } -// Ensure mock implements the interface (compile-time check) +// Ensure mock implements the local interface (compile-time check) var _ mcpClient = (*mockMCPClient)(nil) var _ = Describe("MCPAgent", func() { var ( - ctx context.Context - agent *mcp.MCPAgent // Use concrete type from the package + ctx context.Context + // We test the public MCPAgent wrapper, which uses the implementations internally. + // The actual agent instance might be native or wasm depending on McpServerPath + agent agents.Interface // Use the public agents.Interface mockClient *mockMCPClient ) @@ -60,12 +65,19 @@ var _ = Describe("MCPAgent", func() { callToolArgs: make([]any, 0), // Reset args on each test } - // Instantiate the real agent - agent = &mcp.MCPAgent{} - // Inject the mock client directly using the exported override field - agent.ClientOverride = mockClient + // Instantiate the real agent using a testing constructor + // This constructor needs to be added to the mcp package. + agent = mcp.NewAgentForTesting(mockClient) + Expect(agent).NotTo(BeNil(), "Agent should be created") }) + // Helper to get the concrete agent type for calling specific methods + getConcreteAgent := func() *mcp.MCPAgent { + concreteAgent, ok := agent.(*mcp.MCPAgent) + Expect(ok).To(BeTrue(), "Agent should be of type *mcp.MCPAgent") + return concreteAgent + } + Describe("GetArtistBiography", func() { It("should call the correct tool and return the biography", func() { expectedBio := "This is the artist bio." @@ -79,7 +91,7 @@ var _ = Describe("MCPAgent", func() { return mcp_client.NewToolResponse(mcp_client.NewTextContent(expectedBio)), nil } - bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") + bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") Expect(err).NotTo(HaveOccurred()) Expect(bio).To(Equal(expectedBio)) }) @@ -90,10 +102,16 @@ var _ = Describe("MCPAgent", func() { return nil, expectedErr } - bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") + bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") Expect(err).To(HaveOccurred()) + // The error originates from the implementation now, check for specific part + Expect(err.Error()).To(SatisfyAny( + ContainSubstring("native MCP agent not ready"), // Error from native + ContainSubstring("WASM MCP agent not ready"), // Error from WASM + ContainSubstring("failed to call native MCP tool"), + ContainSubstring("failed to call WASM MCP tool"), + )) Expect(errors.Is(err, expectedErr)).To(BeTrue()) - Expect(err.Error()).To(ContainSubstring(fmt.Sprintf("failed to call MCP tool '%s'", mcp.McpToolNameGetBio))) Expect(bio).To(BeEmpty()) }) @@ -103,7 +121,7 @@ var _ = Describe("MCPAgent", func() { return mcp_client.NewToolResponse(), nil } - bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") + bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") Expect(err).To(MatchError(agents.ErrNotFound)) Expect(bio).To(BeEmpty()) }) @@ -114,7 +132,7 @@ var _ = Describe("MCPAgent", func() { return mcp_client.NewToolResponse(mcp_client.NewTextContent("")), nil } - bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") + bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") Expect(err).To(MatchError(agents.ErrNotFound)) Expect(bio).To(BeEmpty()) }) @@ -124,9 +142,12 @@ var _ = Describe("MCPAgent", func() { return nil, io.ErrClosedPipe } - bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") + bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("MCP agent process communication error")) + Expect(err.Error()).To(SatisfyAny( + ContainSubstring("native MCP agent process communication error"), + ContainSubstring("WASM MCP agent module communication error"), + )) Expect(errors.Is(err, io.ErrClosedPipe)).To(BeTrue()) Expect(bio).To(BeEmpty()) }) @@ -137,7 +158,7 @@ var _ = Describe("MCPAgent", func() { return mcp_client.NewToolResponse(mcp_client.NewTextContent(mcpErrorString)), nil } - bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") + bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1") Expect(err).To(MatchError(agents.ErrNotFound)) Expect(bio).To(BeEmpty()) }) @@ -156,7 +177,7 @@ var _ = Describe("MCPAgent", func() { return mcp_client.NewToolResponse(mcp_client.NewTextContent(expectedURL)), nil } - url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2") + url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2") Expect(err).NotTo(HaveOccurred()) Expect(url).To(Equal(expectedURL)) }) @@ -167,10 +188,15 @@ var _ = Describe("MCPAgent", func() { return nil, expectedErr } - url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2") + url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2") Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(SatisfyAny( + ContainSubstring("native MCP agent not ready"), // Error from native + ContainSubstring("WASM MCP agent not ready"), // Error from WASM + ContainSubstring("failed to call native MCP tool"), + ContainSubstring("failed to call WASM MCP tool"), + )) Expect(errors.Is(err, expectedErr)).To(BeTrue()) - Expect(err.Error()).To(ContainSubstring(fmt.Sprintf("failed to call MCP tool '%s'", mcp.McpToolNameGetURL))) Expect(url).To(BeEmpty()) }) @@ -180,7 +206,7 @@ var _ = Describe("MCPAgent", func() { return mcp_client.NewToolResponse(), nil } - url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2") + url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2") Expect(err).To(MatchError(agents.ErrNotFound)) Expect(url).To(BeEmpty()) }) @@ -190,9 +216,12 @@ var _ = Describe("MCPAgent", func() { return nil, fmt.Errorf("write: %w", io.ErrClosedPipe) } - url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2") + url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2") Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("MCP agent process communication error")) + Expect(err.Error()).To(SatisfyAny( + ContainSubstring("native MCP agent process communication error"), + ContainSubstring("WASM MCP agent module communication error"), + )) Expect(errors.Is(err, io.ErrClosedPipe)).To(BeTrue()) Expect(url).To(BeEmpty()) }) @@ -203,7 +232,7 @@ var _ = Describe("MCPAgent", func() { return mcp_client.NewToolResponse(mcp_client.NewTextContent(mcpErrorString)), nil } - url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2") + url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2") Expect(err).To(MatchError(agents.ErrNotFound)) Expect(url).To(BeEmpty()) }) diff --git a/core/agents/mcp/mcp_process_native.go b/core/agents/mcp/mcp_process_native.go index 8eb3395d1..568d18c22 100644 --- a/core/agents/mcp/mcp_process_native.go +++ b/core/agents/mcp/mcp_process_native.go @@ -2,58 +2,250 @@ package mcp import ( "context" + "errors" "fmt" "io" + "os" "os/exec" "strings" + "sync" + mcp "github.com/metoro-io/mcp-golang" + "github.com/metoro-io/mcp-golang/transport/stdio" + "github.com/navidrome/navidrome/core/agents" "github.com/navidrome/navidrome/log" ) -// startNativeProcess starts the MCP server as a native executable. -func (a *MCPAgent) startNativeProcess(ctx context.Context) (stdin io.WriteCloser, stdout io.ReadCloser, cmd *exec.Cmd, err error) { - log.Debug(ctx, "Starting native MCP server process", "path", McpServerPath) - cmd = exec.CommandContext(context.Background(), McpServerPath) // Use Background context for long-running process +// MCPNative implements the mcpImplementation interface for running the MCP server as a native process. +type MCPNative struct { + mu sync.Mutex + cmd *exec.Cmd // Stores the running command + stdin io.WriteCloser + client mcpClient - stdin, err = cmd.StdinPipe() + // ClientOverride allows injecting a mock client for testing this specific implementation. + ClientOverride mcpClient // TODO: Consider if this is the best way to test +} + +// newMCPNative creates a new instance of the native MCP agent implementation. +func newMCPNative() *MCPNative { + return &MCPNative{} +} + +// --- mcpImplementation interface methods --- + +func (n *MCPNative) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) { + args := ArtistArgs{ + ID: id, + Name: name, + Mbid: mbid, + } + return n.callMCPTool(ctx, McpToolNameGetBio, args) +} + +func (n *MCPNative) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) { + args := ArtistArgs{ + ID: id, + Name: name, + Mbid: mbid, + } + return n.callMCPTool(ctx, McpToolNameGetURL, args) +} + +func (n *MCPNative) Close() error { + n.mu.Lock() + defer n.mu.Unlock() + n.cleanupResources_locked() + return nil // Currently, cleanup doesn't return errors +} + +// --- Internal Helper Methods --- + +// ensureClientInitialized starts the MCP server process and initializes the client if needed. +// MUST be called with the mutex HELD. +func (n *MCPNative) ensureClientInitialized_locked(ctx context.Context) error { + // Use override if provided (for testing) + if n.ClientOverride != nil { + if n.client == nil { + n.client = n.ClientOverride + log.Debug(ctx, "Using provided MCP client override for native testing") + } + return nil + } + + // Check if already initialized + if n.client != nil { + return nil + } + + log.Info(ctx, "Initializing Native MCP client and starting/restarting server process...", "serverPath", McpServerPath) + + // Clean up any old resources *before* starting new ones + n.cleanupResources_locked() + + hostStdinWriter, hostStdoutReader, nativeCmd, startErr := n.startProcess_locked(ctx) + if startErr != nil { + log.Error(ctx, "Failed to start Native MCP server process", "error", startErr) + // Ensure pipes are closed if start failed (startProcess might handle this, but be sure) + if hostStdinWriter != nil { + _ = hostStdinWriter.Close() + } + if hostStdoutReader != nil { + _ = hostStdoutReader.Close() + } + return fmt.Errorf("failed to start native MCP server: %w", startErr) + } + + // --- Initialize MCP client --- + transport := stdio.NewStdioServerTransportWithIO(hostStdoutReader, hostStdinWriter) + clientImpl := mcp.NewClient(transport) + + initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout) + defer cancel() + if _, initErr := clientImpl.Initialize(initCtx); initErr != nil { + err := fmt.Errorf("failed to initialize native MCP client: %w", initErr) + log.Error(ctx, "Native MCP client initialization failed", "error", err) + // Cleanup the newly started process and close pipes + n.cmd = nativeCmd // Temporarily set cmd so cleanup can kill it + n.cleanupResources_locked() + if hostStdinWriter != nil { + _ = hostStdinWriter.Close() + } + if hostStdoutReader != nil { + _ = hostStdoutReader.Close() + } + return err + } + + // --- Initialization successful, update agent state --- + n.cmd = nativeCmd + n.stdin = hostStdinWriter // This is the pipe the agent writes to + n.client = clientImpl + + log.Info(ctx, "Native MCP client initialized successfully", "pid", n.cmd.Process.Pid) + return nil // Success +} + +// callMCPTool handles ensuring initialization and calling the MCP tool. +func (n *MCPNative) callMCPTool(ctx context.Context, toolName string, args any) (string, error) { + // Ensure the client is initialized and the server is running (attempts restart if needed) + n.mu.Lock() + err := n.ensureClientInitialized_locked(ctx) + if err != nil { + n.mu.Unlock() + log.Error(ctx, "Native MCP agent initialization/restart failed", "tool", toolName, "error", err) + return "", fmt.Errorf("native MCP agent not ready: %w", err) + } + + // Keep a reference to the client while locked + currentClient := n.client + // Unlock mutex *before* making the potentially blocking MCP call + n.mu.Unlock() + + // Call the tool using the client reference + log.Debug(ctx, "Calling Native MCP tool", "tool", toolName, "args", args) + response, callErr := currentClient.CallTool(ctx, toolName, args) + if callErr != nil { + // Handle potential pipe closures or other communication errors + log.Error(ctx, "Failed to call Native MCP tool", "tool", toolName, "error", callErr) + // Check if the error indicates a broken pipe, suggesting the server died + // The monitoring goroutine will handle cleanup, just return error here. + if errors.Is(callErr, io.ErrClosedPipe) || strings.Contains(callErr.Error(), "broken pipe") || strings.Contains(callErr.Error(), "EOF") { + log.Warn(ctx, "Native MCP tool call failed, possibly due to server process exit.", "tool", toolName) + // No need to explicitly call cleanup, monitoring goroutine handles it. + return "", fmt.Errorf("native MCP agent process communication error: %w", callErr) + } + return "", fmt.Errorf("failed to call native MCP tool '%s': %w", toolName, callErr) + } + + // Process the response (same logic as before) + if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil || response.Content[0].TextContent.Text == "" { + log.Warn(ctx, "Native MCP tool returned empty/invalid response", "tool", toolName) + // Treat as not found for agent interface consistency + return "", agents.ErrNotFound // Import agents package if needed, or define locally + } + resultText := response.Content[0].TextContent.Text + if strings.HasPrefix(resultText, "handler returned an error:") { + log.Warn(ctx, "Native MCP tool returned an error message", "tool", toolName, "mcpError", resultText) + return "", agents.ErrNotFound // Treat MCP tool errors as "not found" + } + + log.Debug(ctx, "Received response from Native MCP agent", "tool", toolName, "length", len(resultText)) + return resultText, nil +} + +// cleanupResources closes existing resources (stdin, server process). +// MUST be called with the mutex HELD. +func (n *MCPNative) cleanupResources_locked() { + log.Debug(context.Background(), "Cleaning up Native MCP instance resources...") + if n.stdin != nil { + _ = n.stdin.Close() + n.stdin = nil + } + if n.cmd != nil && n.cmd.Process != nil { + pid := n.cmd.Process.Pid + log.Debug(context.Background(), "Killing native MCP process", "pid", pid) + // Kill the process. Ignore error if it's already done. + if err := n.cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) { + log.Error(context.Background(), "Failed to kill native process", "pid", pid, "error", err) + } + // Wait for the process to release resources. Ignore error. + _ = n.cmd.Wait() + n.cmd = nil + } + // Mark client as invalid + n.client = nil +} + +// startProcess starts the MCP server as a native executable and sets up monitoring. +// MUST be called with the mutex HELD. +func (n *MCPNative) startProcess_locked(ctx context.Context) (stdin io.WriteCloser, stdout io.ReadCloser, cmd *exec.Cmd, err error) { + log.Debug(ctx, "Starting native MCP server process", "path", McpServerPath) + // Use Background context for the command itself, as it should outlive the request context (ctx) + cmd = exec.CommandContext(context.Background(), McpServerPath) + + stdinPipe, err := cmd.StdinPipe() if err != nil { return nil, nil, nil, fmt.Errorf("native stdin pipe: %w", err) } - stdout, err = cmd.StdoutPipe() + stdoutPipe, err := cmd.StdoutPipe() if err != nil { - _ = stdin.Close() + _ = stdinPipe.Close() return nil, nil, nil, fmt.Errorf("native stdout pipe: %w", err) } - var stderr strings.Builder - cmd.Stderr = &stderr + var stderrBuf strings.Builder + cmd.Stderr = &stderrBuf if err = cmd.Start(); err != nil { - _ = stdin.Close() - _ = stdout.Close() + _ = stdinPipe.Close() + _ = stdoutPipe.Close() return nil, nil, nil, fmt.Errorf("native start: %w", err) } currentPid := cmd.Process.Pid + currentCmd := cmd // Capture the current cmd pointer for the goroutine log.Info(ctx, "Native MCP server process started", "pid", currentPid) // Start monitoring goroutine - go func(processCmd *exec.Cmd, processStderr *strings.Builder, processPid int) { - waitErr := processCmd.Wait() // Wait for the process to exit - a.mu.Lock() - log.Warn("Native MCP server process exited", "pid", processPid, "error", waitErr, "stderr", processStderr.String()) - // Check if the cmd matches the one we are monitoring before cleaning up - // Use the central cleanup function which handles nil checks. - if a.cmd == processCmd { - a.cleanup() // Use the central cleanup function - log.Info("MCP agent state cleaned up after native process exit", "pid", processPid) + go func() { + waitErr := currentCmd.Wait() // Wait for the specific process this goroutine monitors + n.mu.Lock() + stderrStr := stderrBuf.String() + log.Warn("Native MCP server process exited", "pid", currentPid, "error", waitErr, "stderr", stderrStr) + + // Critical: Check if the agent's current command is STILL the one we were monitoring. + // If it's different, it means cleanup/restart already happened, so we shouldn't cleanup again. + if n.cmd == currentCmd { + n.cleanupResources_locked() // Use the locked version as we hold the lock + log.Info("MCP Native agent state cleaned up after process exit", "pid", currentPid) } else { - log.Debug("Native MCP agent process exited, but state already updated or cmd mismatch", "exitedPid", processPid) + log.Debug("Native MCP process exited, but state already updated/cmd mismatch", "exitedPid", currentPid) } - a.mu.Unlock() - }(cmd, &stderr, currentPid) + n.mu.Unlock() + }() // Return the pipes connected to the process and the Cmd object - return stdin, stdout, cmd, nil + return stdinPipe, stdoutPipe, cmd, nil } diff --git a/core/agents/mcp/mcp_process_wazero.go b/core/agents/mcp/mcp_process_wazero.go new file mode 100644 index 000000000..81bf79089 --- /dev/null +++ b/core/agents/mcp/mcp_process_wazero.go @@ -0,0 +1,352 @@ +package mcp + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "strings" + "sync" + "time" + + mcp "github.com/metoro-io/mcp-golang" + "github.com/metoro-io/mcp-golang/transport/stdio" + "github.com/navidrome/navidrome/core/agents" // Needed for ErrNotFound + "github.com/navidrome/navidrome/log" + "github.com/tetratelabs/wazero" // Needed for types + "github.com/tetratelabs/wazero/api" // Needed for types +) + +// MCPWasm implements the mcpImplementation interface for running the MCP server as a WASM module. +type MCPWasm struct { + mu sync.Mutex + wasmModule api.Module // Stores the instantiated module + wasmCompiled api.Closer // Stores the compiled module reference for this instance + stdin io.WriteCloser + client mcpClient + + // Shared resources (passed in, not owned by this struct) + wasmRuntime api.Closer // Closer for the shared Wazero Runtime + wasmCache wazero.CompilationCache // Shared Compilation Cache (can be nil) + + // ClientOverride allows injecting a mock client for testing this specific implementation. + ClientOverride mcpClient // TODO: Consider if this is the best way to test +} + +// newMCPWasm creates a new instance of the WASM MCP agent implementation. +func newMCPWasm(runtime api.Closer, cache wazero.CompilationCache) *MCPWasm { + return &MCPWasm{ + wasmRuntime: runtime, + wasmCache: cache, + } +} + +// --- mcpImplementation interface methods --- + +func (w *MCPWasm) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) { + args := ArtistArgs{ + ID: id, + Name: name, + Mbid: mbid, + } + return w.callMCPTool(ctx, McpToolNameGetBio, args) +} + +func (w *MCPWasm) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) { + args := ArtistArgs{ + ID: id, + Name: name, + Mbid: mbid, + } + return w.callMCPTool(ctx, McpToolNameGetURL, args) +} + +// Close cleans up instance-specific WASM resources. +// It does NOT close the shared runtime or cache. +func (w *MCPWasm) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + w.cleanupResources_locked() + return nil // Currently, cleanup doesn't return errors +} + +// --- Internal Helper Methods --- + +// ensureClientInitialized starts the MCP WASM module and initializes the client if needed. +// MUST be called with the mutex HELD. +func (w *MCPWasm) ensureClientInitialized_locked(ctx context.Context) error { + // Use override if provided (for testing) + if w.ClientOverride != nil { + if w.client == nil { + w.client = w.ClientOverride + log.Debug(ctx, "Using provided MCP client override for WASM testing") + } + return nil + } + + // Check if already initialized + if w.client != nil { + return nil + } + + log.Info(ctx, "Initializing WASM MCP client and starting/restarting server module...", "serverPath", McpServerPath) + + // Clean up any old instance resources *before* starting new ones + w.cleanupResources_locked() + + // Check if shared runtime exists (it should if constructor succeeded) + if w.wasmRuntime == nil { + return errors.New("shared Wazero runtime not initialized for MCPWasm") + } + + hostStdinWriter, hostStdoutReader, mod, compiled, startErr := w.startModule_locked(ctx) + if startErr != nil { + log.Error(ctx, "Failed to start WASM MCP server module", "error", startErr) + // Ensure pipes are closed if start failed + if hostStdinWriter != nil { + _ = hostStdinWriter.Close() + } + if hostStdoutReader != nil { + _ = hostStdoutReader.Close() + } + // startModule_locked handles cleanup of mod/compiled on error + return fmt.Errorf("failed to start WASM MCP server: %w", startErr) + } + + // --- Initialize MCP client --- + transport := stdio.NewStdioServerTransportWithIO(hostStdoutReader, hostStdinWriter) + clientImpl := mcp.NewClient(transport) + + initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout) + defer cancel() + if _, initErr := clientImpl.Initialize(initCtx); initErr != nil { + err := fmt.Errorf("failed to initialize WASM MCP client: %w", initErr) + log.Error(ctx, "WASM MCP client initialization failed", "error", err) + // Cleanup the newly started module and close pipes + w.wasmModule = mod // Temporarily set so cleanup can close it + w.wasmCompiled = compiled // Temporarily set so cleanup can close it + w.cleanupResources_locked() + if hostStdinWriter != nil { + _ = hostStdinWriter.Close() + } + if hostStdoutReader != nil { + _ = hostStdoutReader.Close() + } + return err + } + + // --- Initialization successful, update agent state --- + w.wasmModule = mod + w.wasmCompiled = compiled + w.stdin = hostStdinWriter // This is the pipe the agent writes to + w.client = clientImpl + + log.Info(ctx, "WASM MCP client initialized successfully") + return nil // Success +} + +// callMCPTool handles ensuring initialization and calling the MCP tool. +func (w *MCPWasm) callMCPTool(ctx context.Context, toolName string, args any) (string, error) { + // Ensure the client is initialized and the server is running (attempts restart if needed) + w.mu.Lock() + err := w.ensureClientInitialized_locked(ctx) + if err != nil { + w.mu.Unlock() + log.Error(ctx, "WASM MCP agent initialization/restart failed", "tool", toolName, "error", err) + return "", fmt.Errorf("WASM MCP agent not ready: %w", err) + } + + // Keep a reference to the client while locked + currentClient := w.client + // Unlock mutex *before* making the potentially blocking MCP call + w.mu.Unlock() + + // Call the tool using the client reference + log.Debug(ctx, "Calling WASM MCP tool", "tool", toolName, "args", args) + response, callErr := currentClient.CallTool(ctx, toolName, args) + if callErr != nil { + // Handle potential pipe closures or other communication errors + log.Error(ctx, "Failed to call WASM MCP tool", "tool", toolName, "error", callErr) + // Check if the error indicates a broken pipe, suggesting the server died + // The monitoring goroutine will handle cleanup, just return error here. + if errors.Is(callErr, io.ErrClosedPipe) || strings.Contains(callErr.Error(), "broken pipe") || strings.Contains(callErr.Error(), "EOF") { + log.Warn(ctx, "WASM MCP tool call failed, possibly due to server module exit.", "tool", toolName) + // No need to explicitly call cleanup, monitoring goroutine handles it. + return "", fmt.Errorf("WASM MCP agent module communication error: %w", callErr) + } + return "", fmt.Errorf("failed to call WASM MCP tool '%s': %w", toolName, callErr) + } + + // Process the response (same logic as native) + if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil || response.Content[0].TextContent.Text == "" { + log.Warn(ctx, "WASM MCP tool returned empty/invalid response", "tool", toolName) + return "", agents.ErrNotFound + } + resultText := response.Content[0].TextContent.Text + if strings.HasPrefix(resultText, "handler returned an error:") { + log.Warn(ctx, "WASM MCP tool returned an error message", "tool", toolName, "mcpError", resultText) + return "", agents.ErrNotFound // Treat MCP tool errors as "not found" + } + + log.Debug(ctx, "Received response from WASM MCP agent", "tool", toolName, "length", len(resultText)) + return resultText, nil +} + +// cleanupResources closes instance-specific WASM resources (stdin, module, compiled ref). +// It specifically avoids closing the shared runtime or cache. +// MUST be called with the mutex HELD. +func (w *MCPWasm) cleanupResources_locked() { + log.Debug(context.Background(), "Cleaning up WASM MCP instance resources...") + if w.stdin != nil { + _ = w.stdin.Close() + w.stdin = nil + } + // Close the module instance + if w.wasmModule != nil { + log.Debug(context.Background(), "Closing WASM module instance") + ctxClose, cancel := context.WithTimeout(context.Background(), 2*time.Second) + if err := w.wasmModule.Close(ctxClose); err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.Error(context.Background(), "Failed to close WASM module instance", "error", err) + } + cancel() + w.wasmModule = nil + } + // Close the compiled module reference for this instance + if w.wasmCompiled != nil { + log.Debug(context.Background(), "Closing compiled WASM module ref") + if err := w.wasmCompiled.Close(context.Background()); err != nil { + log.Error(context.Background(), "Failed to close compiled WASM module ref", "error", err) + } + w.wasmCompiled = nil + } + // Mark client as invalid + w.client = nil + // DO NOT CLOSE w.wasmRuntime or w.wasmCache here! +} + +// startModule loads and starts the MCP server as a WASM module. +// MUST be called with the mutex HELD. +func (w *MCPWasm) startModule_locked(ctx context.Context) (hostStdinWriter io.WriteCloser, hostStdoutReader io.ReadCloser, mod api.Module, compiled api.Closer, err error) { + log.Debug(ctx, "Loading WASM MCP server module", "path", McpServerPath) + wasmBytes, err := os.ReadFile(McpServerPath) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("read wasm file: %w", err) + } + + // Create pipes for stdio redirection + wasmStdinReader, hostStdinWriter, err := os.Pipe() + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("wasm stdin pipe: %w", err) + } + // Defer close pipes on error exit + shouldClosePipesOnError := true + defer func() { + if shouldClosePipesOnError { + if wasmStdinReader != nil { + _ = wasmStdinReader.Close() + } + if hostStdinWriter != nil { + _ = hostStdinWriter.Close() + } + // hostStdoutReader and wasmStdoutWriter handled below + } + }() + + hostStdoutReader, wasmStdoutWriter, err := os.Pipe() + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("wasm stdout pipe: %w", err) + } + // Defer close pipes on error exit + defer func() { + if shouldClosePipesOnError { + if hostStdoutReader != nil { + _ = hostStdoutReader.Close() + } + if wasmStdoutWriter != nil { + _ = wasmStdoutWriter.Close() + } + } + }() + + // Use the SHARDED runtime from the agent struct + runtime, ok := w.wasmRuntime.(wazero.Runtime) + if !ok || runtime == nil { + return nil, nil, nil, nil, errors.New("wasmRuntime is not initialized or not a wazero.Runtime") + } + + // Prepare module configuration + // Host functions and WASI are already part of the shared runtime + config := wazero.NewModuleConfig(). + WithStdin(wasmStdinReader). + WithStdout(wasmStdoutWriter). + WithStderr(os.Stderr). + WithArgs(McpServerPath). + WithFS(os.DirFS("/")) // Keep FS access for now + + log.Debug(ctx, "Compiling WASM module (using cache if enabled)...") + // Compile module using the shared runtime + compiledModule, err := runtime.CompileModule(ctx, wasmBytes) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("compile wasm module: %w", err) + } + // Defer closing compiled module only if an error occurs later in this function. + shouldCloseCompiledOnError := true + defer func() { + if shouldCloseCompiledOnError && compiledModule != nil { + _ = compiledModule.Close(context.Background()) + } + }() + + log.Info(ctx, "Instantiating WASM module (will run _start)...") + var instance api.Module + instanceErrChan := make(chan error, 1) + go func() { + var instantiateErr error + // Use context.Background() for the module's main execution context + instance, instantiateErr = runtime.InstantiateModule(context.Background(), compiledModule, config) + instanceErrChan <- instantiateErr + }() + + // Wait briefly for immediate instantiation errors + select { + case instantiateErr := <-instanceErrChan: + if instantiateErr != nil { + log.Error(ctx, "Failed to instantiate WASM module", "error", instantiateErr) + // compiledModule closed by defer + // pipes closed by defer + return nil, nil, nil, nil, fmt.Errorf("instantiate wasm module: %w", instantiateErr) + } + log.Warn(ctx, "WASM module instantiation returned (exited?) immediately without error.") + case <-time.After(2 * time.Second): + log.Debug(ctx, "WASM module instantiation likely blocking (server running), proceeding...") + } + + // Start a monitoring goroutine for WASM module exit/error + // Pass required values to the goroutine closure + go func(instanceToMonitor api.Module, compiledToClose api.Closer, errChan chan error) { + // This blocks until the instance created by InstantiateModule exits or errors. + instantiateErr := <-errChan + + w.mu.Lock() // Lock the specific MCPWasm instance + log.Warn("WASM module exited/errored", "error", instantiateErr) + + // Critical: Check if the agent's current module is STILL the one we were monitoring. + if w.wasmModule == instanceToMonitor { + w.cleanupResources_locked() // Use the locked version + log.Info("MCP WASM agent state cleaned up after module exit/error") + } else { + log.Debug("WASM module exited, but state already updated/module mismatch. Explicitly closing this instance's compiled ref.") + // Manually close the compiled module ref associated with *this specific instance* + if compiledToClose != nil { + _ = compiledToClose.Close(context.Background()) + } + } + w.mu.Unlock() + }(instance, compiledModule, instanceErrChan) + + // Success: prevent deferred cleanup of pipes and compiled module + shouldClosePipesOnError = false + shouldCloseCompiledOnError = false + return hostStdinWriter, hostStdoutReader, instance, compiledModule, nil +}