mirror of
https://github.com/navidrome/navidrome.git
synced 2025-05-05 21:01:08 +03:00
feat: refactor MCP agent to support native and WASM implementations
Refactored the MCPAgent to delegate core functionality to separate implementations for native process and WASM execution. Introduced an `mcpImplementation` interface (`MCPNative`, `MCPWasm`) to abstract the underlying execution method. The main `MCPAgent` now holds an instance of this interface, selected by the `mcpConstructor` based on the `McpServerPath` (native executable or `.wasm` file). Shared Wazero resources (runtime, cache, WASI, host functions) are now initialized once in the constructor and passed to the `MCPWasm` implementation, improving resource management and potentially startup performance for WASM modules. Updated tests (`mcp_agent_test.go`) to use a new testing constructor (`NewAgentForTesting`) which injects a mock client into the appropriate implementation. Assertions were adjusted to reflect the refactored error handling and structure. Also updated the `McpServerPath` to use a relative path.
This commit is contained in:
parent
2f71516dde
commit
ae93e555c9
@ -3,19 +3,15 @@ package mcp
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mcp "github.com/metoro-io/mcp-golang"
|
||||
"github.com/metoro-io/mcp-golang/transport/stdio"
|
||||
"github.com/tetratelabs/wazero"
|
||||
"github.com/tetratelabs/wazero/api"
|
||||
mcp "github.com/metoro-io/mcp-golang" // Needed for mcpClient interface types
|
||||
"github.com/tetratelabs/wazero" // Needed for constructor
|
||||
|
||||
// Needed for constructor types (api.Closer)
|
||||
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
@ -27,482 +23,176 @@ import (
|
||||
// Exported constants for testing
|
||||
const (
|
||||
McpAgentName = "mcp"
|
||||
McpServerPath = "./core/agents/mcp/mcp-server/mcp-server.wasm"
|
||||
McpToolNameGetBio = "get_artist_biography"
|
||||
McpToolNameGetURL = "get_artist_url"
|
||||
initializationTimeout = 10 * time.Second
|
||||
initializationTimeout = 5 * time.Second
|
||||
// McpServerPath defines the location of the MCP server executable or WASM module.
|
||||
McpServerPath = "./core/agents/mcp/mcp-server/mcp-server.wasm"
|
||||
McpToolNameGetBio = "get_artist_biography"
|
||||
McpToolNameGetURL = "get_artist_url"
|
||||
)
|
||||
|
||||
// mcpClient interface matching the methods used from mcp.Client
|
||||
// Allows for mocking in tests.
|
||||
// Needs to be defined here as it's used by implementations
|
||||
type mcpClient interface {
|
||||
Initialize(ctx context.Context) (*mcp.InitializeResponse, error)
|
||||
CallTool(ctx context.Context, toolName string, args any) (*mcp.ToolResponse, error)
|
||||
}
|
||||
|
||||
// MCPAgent interacts with an external MCP server for metadata retrieval.
|
||||
// It keeps a single instance of the server process running and attempts restart on failure.
|
||||
// Supports both native executables and WASM modules (via Wazero).
|
||||
type MCPAgent struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// Runtime state
|
||||
stdin io.WriteCloser
|
||||
client mcpClient
|
||||
cmd *exec.Cmd // Stores the native process command
|
||||
wasmModule api.Module // Stores the instantiated WASM module
|
||||
|
||||
// Shared Wazero resources (created once)
|
||||
wasmRuntime api.Closer // Shared Wazero Runtime (implements Close(context.Context))
|
||||
wasmCache wazero.CompilationCache // Shared Compilation Cache (implements Close(context.Context))
|
||||
|
||||
// WASM resources per instance (cleaned up by monitoring goroutine)
|
||||
wasmCompiled api.Closer // Stores the compiled WASM module for closing
|
||||
|
||||
// ClientOverride allows injecting a mock client for testing.
|
||||
// This field should ONLY be set in test code.
|
||||
ClientOverride mcpClient
|
||||
// mcpImplementation defines the common interface for both native and WASM MCP agents.
|
||||
// This allows the main MCPAgent to delegate calls without knowing the underlying type.
|
||||
type mcpImplementation interface {
|
||||
agents.ArtistBiographyRetriever
|
||||
agents.ArtistURLRetriever
|
||||
Close() error // For cleaning up resources associated with this specific implementation.
|
||||
}
|
||||
|
||||
// MCPAgent is the public-facing agent registered with Navidrome.
|
||||
// It acts as a wrapper around the actual implementation (native or WASM).
|
||||
type MCPAgent struct {
|
||||
// No mutex needed here if impl is set once at construction
|
||||
// and the implementation handles its own internal state synchronization.
|
||||
impl mcpImplementation
|
||||
|
||||
// We might need a way to close shared resources later, but for now,
|
||||
// the agent lifecycle is managed by the constructor and the impl.Close().
|
||||
}
|
||||
|
||||
// mcpConstructor creates the appropriate MCP implementation (native or WASM)
|
||||
// and wraps it in the MCPAgent.
|
||||
func mcpConstructor(ds model.DataStore) agents.Interface {
|
||||
// Check if the MCP server executable exists
|
||||
// Check if the MCP server executable/WASM exists
|
||||
if _, err := os.Stat(McpServerPath); os.IsNotExist(err) {
|
||||
log.Warn("MCP server executable/WASM not found, disabling agent", "path", McpServerPath, "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
a := &MCPAgent{}
|
||||
var agentImpl mcpImplementation
|
||||
var err error
|
||||
|
||||
// If it's a WASM path, pre-initialize the shared Wazero runtime, cache, and host functions.
|
||||
if strings.HasSuffix(McpServerPath, ".wasm") {
|
||||
log.Info("Configuring MCP agent for WASM execution", "path", McpServerPath)
|
||||
ctx := context.Background() // Use background context for setup
|
||||
|
||||
// --- Setup Shared Wazero Resources ---
|
||||
var cache wazero.CompilationCache
|
||||
cacheDir := filepath.Join(conf.Server.DataFolder, "cache", "wazero")
|
||||
if err := os.MkdirAll(cacheDir, 0755); err != nil {
|
||||
log.Error(ctx, "Failed to create Wazero cache directory, WASM caching disabled", "path", cacheDir, "error", err)
|
||||
if errMkdir := os.MkdirAll(cacheDir, 0755); errMkdir != nil {
|
||||
log.Error(ctx, "Failed to create Wazero cache directory, WASM caching disabled", "path", cacheDir, "error", errMkdir)
|
||||
} else {
|
||||
cache, err := wazero.NewCompilationCacheWithDir(cacheDir)
|
||||
cache, err = wazero.NewCompilationCacheWithDir(cacheDir)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Failed to create Wazero compilation cache, WASM caching disabled", "path", cacheDir, "error", err)
|
||||
} else {
|
||||
a.wasmCache = cache
|
||||
log.Info(ctx, "Wazero compilation cache enabled", "path", cacheDir)
|
||||
cache = nil // Ensure cache is nil if creation failed
|
||||
}
|
||||
}
|
||||
|
||||
// Create runtime config, adding cache if it was created successfully
|
||||
// Create runtime config, adding cache if it exists
|
||||
runtimeConfig := wazero.NewRuntimeConfig()
|
||||
if a.wasmCache != nil {
|
||||
runtimeConfig = runtimeConfig.WithCompilationCache(a.wasmCache)
|
||||
if cache != nil {
|
||||
runtimeConfig = runtimeConfig.WithCompilationCache(cache)
|
||||
}
|
||||
|
||||
// Create the shared runtime
|
||||
runtime := wazero.NewRuntimeWithConfig(ctx, runtimeConfig)
|
||||
|
||||
// --- Register Host Functions --- Must happen BEFORE WASI instantiation if WASI needs them?
|
||||
// Actually, WASI instantiation is separate from host func instantiation.
|
||||
if err := registerHostFunctions(ctx, runtime); err != nil {
|
||||
// Error already logged by registerHostFunctions
|
||||
// Register host functions
|
||||
if err = registerHostFunctions(ctx, runtime); err != nil {
|
||||
_ = runtime.Close(ctx)
|
||||
if a.wasmCache != nil {
|
||||
_ = a.wasmCache.Close(ctx)
|
||||
if cache != nil {
|
||||
_ = cache.Close(ctx)
|
||||
}
|
||||
return nil
|
||||
return nil // Fatal error
|
||||
}
|
||||
// --- End Host Function Registration ---
|
||||
|
||||
a.wasmRuntime = runtime // Store the runtime closer
|
||||
|
||||
// Instantiate WASI onto the shared runtime. If this fails, the agent is unusable for WASM.
|
||||
if _, err := wasi_snapshot_preview1.Instantiate(ctx, runtime); err != nil {
|
||||
// Instantiate WASI
|
||||
if _, err = wasi_snapshot_preview1.Instantiate(ctx, runtime); err != nil {
|
||||
log.Error(ctx, "Failed to instantiate WASI on shared Wazero runtime, MCP WASM agent disabled", "error", err)
|
||||
// Close runtime and cache if WASI fails
|
||||
_ = runtime.Close(ctx)
|
||||
if a.wasmCache != nil {
|
||||
_ = a.wasmCache.Close(ctx)
|
||||
if cache != nil {
|
||||
_ = cache.Close(ctx)
|
||||
}
|
||||
return nil // Cannot proceed if WASI fails
|
||||
return nil // Fatal error
|
||||
}
|
||||
log.Info(ctx, "Shared Wazero runtime, WASI, and host functions initialized for MCP agent")
|
||||
// --- End Shared Resource Setup ---
|
||||
|
||||
// Create the WASM-specific implementation
|
||||
agentImpl = newMCPWasm(runtime, cache) // Pass shared resources
|
||||
log.Info(ctx, "Shared Wazero runtime, WASI, cache, and host functions initialized for MCP agent")
|
||||
|
||||
} else {
|
||||
log.Info("Configuring MCP agent for native execution", "path", McpServerPath)
|
||||
// Create the native-specific implementation
|
||||
agentImpl = newMCPNative()
|
||||
}
|
||||
|
||||
log.Info("MCP Agent created, server will be started on first request", "serverPath", McpServerPath)
|
||||
return a
|
||||
// Return the wrapper agent
|
||||
log.Info("MCP Agent implementation created successfully")
|
||||
return &MCPAgent{impl: agentImpl}
|
||||
}
|
||||
|
||||
// NewAgentForTesting is a constructor specifically for tests.
|
||||
// It creates the appropriate implementation based on McpServerPath
|
||||
// and injects a mock mcpClient into its ClientOverride field.
|
||||
func NewAgentForTesting(mockClient mcpClient) agents.Interface {
|
||||
// We need to replicate the logic from mcpConstructor to determine
|
||||
// the implementation type, but without actually starting processes.
|
||||
|
||||
var agentImpl mcpImplementation
|
||||
|
||||
if strings.HasSuffix(McpServerPath, ".wasm") {
|
||||
// For WASM testing, we might not need the full runtime setup,
|
||||
// just the struct. Pass nil for shared resources for now.
|
||||
// We rely on the mockClient being used before any real WASM interaction.
|
||||
wasmImpl := newMCPWasm(nil, nil) // Pass nil runtime/cache
|
||||
wasmImpl.ClientOverride = mockClient
|
||||
agentImpl = wasmImpl
|
||||
} else {
|
||||
nativeImpl := newMCPNative()
|
||||
nativeImpl.ClientOverride = mockClient
|
||||
agentImpl = nativeImpl
|
||||
}
|
||||
|
||||
return &MCPAgent{impl: agentImpl}
|
||||
}
|
||||
|
||||
// --- MCPAgent Method Implementations (Delegation) ---
|
||||
|
||||
func (a *MCPAgent) AgentName() string {
|
||||
return McpAgentName
|
||||
}
|
||||
|
||||
// cleanup closes existing resources (stdin, server process/module).
|
||||
// MUST be called while holding the mutex.
|
||||
func (a *MCPAgent) cleanup() {
|
||||
log.Debug(context.Background(), "Cleaning up MCP agent instance resources...")
|
||||
if a.stdin != nil {
|
||||
_ = a.stdin.Close()
|
||||
a.stdin = nil
|
||||
func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
if a.impl == nil {
|
||||
return "", errors.New("MCP agent implementation is nil")
|
||||
}
|
||||
// Clean up native process if it exists
|
||||
if a.cmd != nil && a.cmd.Process != nil {
|
||||
log.Debug(context.Background(), "Killing native MCP process", "pid", a.cmd.Process.Pid)
|
||||
if err := a.cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) {
|
||||
log.Error(context.Background(), "Failed to kill native process", "pid", a.cmd.Process.Pid, "error", err)
|
||||
}
|
||||
// Wait might return an error if already killed/exited, ignore it.
|
||||
_ = a.cmd.Wait()
|
||||
a.cmd = nil
|
||||
}
|
||||
// Clean up WASM module instance if it exists
|
||||
if a.wasmModule != nil {
|
||||
log.Debug(context.Background(), "Closing WASM module instance")
|
||||
ctxClose, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
if err := a.wasmModule.Close(ctxClose); err != nil {
|
||||
// Ignore context deadline exceeded as it means close was successful but slow
|
||||
if !errors.Is(err, context.DeadlineExceeded) {
|
||||
log.Error(context.Background(), "Failed to close WASM module instance", "error", err)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
a.wasmModule = nil
|
||||
}
|
||||
// Clean up compiled module ref for this instance
|
||||
if a.wasmCompiled != nil {
|
||||
log.Debug(context.Background(), "Closing compiled WASM module ref")
|
||||
// Use background context, Close should be quick
|
||||
if err := a.wasmCompiled.Close(context.Background()); err != nil {
|
||||
log.Error(context.Background(), "Failed to close compiled WASM module ref", "error", err)
|
||||
}
|
||||
a.wasmCompiled = nil
|
||||
}
|
||||
|
||||
// DO NOT close shared wasmRuntime or wasmCache here.
|
||||
|
||||
// Mark client as invalid
|
||||
a.client = nil
|
||||
return a.impl.GetArtistBiography(ctx, id, name, mbid)
|
||||
}
|
||||
|
||||
// ensureClientInitialized starts the MCP server process (native or WASM)
|
||||
// and initializes the client if needed. Attempts restart on failure.
|
||||
func (a *MCPAgent) ensureClientInitialized(ctx context.Context) (err error) {
|
||||
// --- Use override if provided (for testing) ---
|
||||
if a.ClientOverride != nil {
|
||||
a.mu.Lock()
|
||||
if a.client == nil {
|
||||
a.client = a.ClientOverride
|
||||
log.Debug(ctx, "Using provided MCP client override for testing")
|
||||
}
|
||||
a.mu.Unlock()
|
||||
return nil
|
||||
func (a *MCPAgent) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
if a.impl == nil {
|
||||
return "", errors.New("MCP agent implementation is nil")
|
||||
}
|
||||
|
||||
// --- Check if already initialized (critical section) ---
|
||||
a.mu.Lock()
|
||||
if a.client != nil {
|
||||
a.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// --- Client is nil, proceed with initialization *while holding the lock* ---
|
||||
defer a.mu.Unlock()
|
||||
|
||||
log.Info(ctx, "Initializing MCP client and starting/restarting server process...", "serverPath", McpServerPath)
|
||||
|
||||
// Clean up any old resources *before* starting new ones
|
||||
a.cleanup()
|
||||
|
||||
var hostStdinWriter io.WriteCloser
|
||||
var hostStdoutReader io.ReadCloser
|
||||
var startErr error
|
||||
var isWasm bool
|
||||
|
||||
if strings.HasSuffix(McpServerPath, ".wasm") {
|
||||
isWasm = true
|
||||
// Check if shared runtime exists (it should if constructor succeeded for WASM)
|
||||
if a.wasmRuntime == nil {
|
||||
startErr = errors.New("shared Wazero runtime not initialized")
|
||||
} else {
|
||||
var mod api.Module
|
||||
var compiled api.Closer // Store compiled module ref per instance
|
||||
hostStdinWriter, hostStdoutReader, mod, compiled, startErr = a.startWasmModule(ctx)
|
||||
if startErr == nil {
|
||||
a.wasmModule = mod
|
||||
a.wasmCompiled = compiled // Store compiled ref for cleanup
|
||||
} else {
|
||||
// Ensure potential partial resources from startWasmModule are closed on error
|
||||
// startWasmModule's deferred cleanup should handle pipes and compiled module.
|
||||
// Mod instance might need closing if instantiation partially succeeded before erroring.
|
||||
if mod != nil {
|
||||
_ = mod.Close(ctx)
|
||||
}
|
||||
// Do not close shared runtime here
|
||||
}
|
||||
}
|
||||
} else {
|
||||
isWasm = false
|
||||
var nativeCmd *exec.Cmd
|
||||
hostStdinWriter, hostStdoutReader, nativeCmd, startErr = a.startNativeProcess(ctx)
|
||||
if startErr == nil {
|
||||
a.cmd = nativeCmd
|
||||
}
|
||||
}
|
||||
|
||||
if startErr != nil {
|
||||
log.Error(ctx, "Failed to start MCP server process/module", "isWasm", isWasm, "error", startErr)
|
||||
// Ensure pipes are closed if start failed (start functions might have deferred closes, but belt-and-suspenders)
|
||||
if hostStdinWriter != nil {
|
||||
_ = hostStdinWriter.Close()
|
||||
}
|
||||
if hostStdoutReader != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
}
|
||||
// a.cleanup() was already called, specific resources (cmd/wasmModule) are nil
|
||||
return fmt.Errorf("failed to start MCP server: %w", startErr)
|
||||
}
|
||||
|
||||
// --- Initialize MCP client --- (Ensure stdio transport import)
|
||||
transport := stdio.NewStdioServerTransportWithIO(hostStdoutReader, hostStdinWriter)
|
||||
clientImpl := mcp.NewClient(transport)
|
||||
|
||||
initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout)
|
||||
defer cancel()
|
||||
if _, initErr := clientImpl.Initialize(initCtx); initErr != nil {
|
||||
err = fmt.Errorf("failed to initialize MCP client: %w", initErr)
|
||||
log.Error(ctx, "MCP client initialization failed after process/module start", "isWasm", isWasm, "error", err)
|
||||
// Cleanup the newly started process/module and pipes as init failed
|
||||
a.cleanup() // This should handle cmd/wasmModule
|
||||
// Close the pipes directly as cleanup() doesn't know about them
|
||||
if hostStdinWriter != nil {
|
||||
_ = hostStdinWriter.Close()
|
||||
}
|
||||
if hostStdoutReader != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
}
|
||||
return err // defer mu.Unlock() will run
|
||||
}
|
||||
|
||||
// --- Initialization successful, update agent state (still holding lock) ---
|
||||
a.stdin = hostStdinWriter // This is the pipe the agent writes to
|
||||
a.client = clientImpl
|
||||
// cmd or wasmModule/Runtime/Compiled are already set by the start helpers
|
||||
|
||||
log.Info(ctx, "MCP client initialized successfully", "isWasm", isWasm)
|
||||
// defer mu.Unlock() runs here
|
||||
return nil // Success
|
||||
return a.impl.GetArtistURL(ctx, id, name, mbid)
|
||||
}
|
||||
|
||||
// startNativeProcess was moved to mcp_process_native.go
|
||||
|
||||
// startWasmModule loads and starts the MCP server as a WASM module using the agent's shared Wazero runtime.
|
||||
func (a *MCPAgent) startWasmModule(ctx context.Context) (hostStdinWriter io.WriteCloser, hostStdoutReader io.ReadCloser, mod api.Module, compiled api.Closer, err error) {
|
||||
log.Debug(ctx, "Loading WASM MCP server module", "path", McpServerPath)
|
||||
wasmBytes, err := os.ReadFile(McpServerPath)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, fmt.Errorf("read wasm file: %w", err)
|
||||
}
|
||||
|
||||
// Create pipes for stdio redirection
|
||||
wasmStdinReader, hostStdinWriter, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, fmt.Errorf("wasm stdin pipe: %w", err)
|
||||
}
|
||||
// Defer close pipes on error exit
|
||||
defer func() {
|
||||
if err != nil {
|
||||
_ = wasmStdinReader.Close()
|
||||
_ = hostStdinWriter.Close()
|
||||
// hostStdoutReader and wasmStdoutWriter handled below
|
||||
}
|
||||
}()
|
||||
|
||||
hostStdoutReader, wasmStdoutWriter, err := os.Pipe()
|
||||
if err != nil {
|
||||
_ = wasmStdinReader.Close() // Close previous pipe
|
||||
_ = hostStdinWriter.Close() // Close previous pipe
|
||||
return nil, nil, nil, nil, fmt.Errorf("wasm stdout pipe: %w", err)
|
||||
}
|
||||
// Defer close pipes on error exit
|
||||
defer func() {
|
||||
if err != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
_ = wasmStdoutWriter.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// Use the SHARDED runtime from the agent struct
|
||||
runtime, ok := a.wasmRuntime.(wazero.Runtime)
|
||||
if !ok || runtime == nil {
|
||||
return nil, nil, nil, nil, errors.New("wasmRuntime is not initialized or not a wazero.Runtime")
|
||||
}
|
||||
// WASI is already instantiated on the shared runtime
|
||||
|
||||
config := wazero.NewModuleConfig().
|
||||
WithStdin(wasmStdinReader).
|
||||
WithStdout(wasmStdoutWriter).
|
||||
WithStderr(os.Stderr).
|
||||
WithArgs(McpServerPath).
|
||||
// Grant access to the host filesystem. Needed for DNS lookup (/etc/resolv.conf)
|
||||
// and potentially other operations depending on the module.
|
||||
// SECURITY: This grants broad access; consider more restricted FS if needed.
|
||||
WithFS(os.DirFS("/"))
|
||||
|
||||
log.Debug(ctx, "Compiling WASM module (using cache if enabled)...")
|
||||
// Compile module using the shared runtime (which uses the configured cache)
|
||||
compiledModule, err := runtime.CompileModule(ctx, wasmBytes)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, fmt.Errorf("compile wasm module: %w", err)
|
||||
}
|
||||
// Defer closing compiled module in case of errors later in this function.
|
||||
// Caller (ensureClientInitialized) is responsible for closing on success.
|
||||
shouldCloseCompiledOnError := true
|
||||
defer func() {
|
||||
if shouldCloseCompiledOnError && compiledModule != nil {
|
||||
_ = compiledModule.Close(context.Background())
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info(ctx, "Instantiating WASM module (will run _start)...")
|
||||
var instance api.Module
|
||||
instanceErrChan := make(chan error, 1)
|
||||
go func() {
|
||||
var instantiateErr error
|
||||
// Use context.Background() for the module's main execution context
|
||||
instance, instantiateErr = runtime.InstantiateModule(context.Background(), compiledModule, config)
|
||||
instanceErrChan <- instantiateErr
|
||||
}()
|
||||
|
||||
// Wait briefly for immediate instantiation errors
|
||||
select {
|
||||
case instantiateErr := <-instanceErrChan:
|
||||
if instantiateErr != nil {
|
||||
log.Error(ctx, "Failed to instantiate WASM module", "error", instantiateErr)
|
||||
// compiledModule closed by defer
|
||||
// pipes closed by defer
|
||||
return nil, nil, nil, nil, fmt.Errorf("instantiate wasm module: %w", instantiateErr)
|
||||
}
|
||||
// If instantiateErr is nil here, the module exited immediately without error. Log it.
|
||||
log.Warn(ctx, "WASM module instantiation returned (exited?) immediately without error.")
|
||||
// Proceed to start monitoring, but return the (already closed) instance
|
||||
// Pipes will be closed by the successful return path.
|
||||
case <-time.After(2 * time.Second):
|
||||
log.Debug(ctx, "WASM module instantiation likely blocking (server running), proceeding...")
|
||||
}
|
||||
|
||||
// Start a monitoring goroutine for WASM module exit/error
|
||||
go func(modToMonitor api.Module, compiledToClose api.Closer, errChan chan error) {
|
||||
// This will block until the instance created by InstantiateModule exits or errors.
|
||||
instantiateErr := <-errChan
|
||||
|
||||
a.mu.Lock()
|
||||
log.Warn("WASM module exited/errored", "error", instantiateErr)
|
||||
// Check if the module currently stored in the agent is the one we were monitoring.
|
||||
// Use the central cleanup which handles nil checks.
|
||||
if a.wasmModule == modToMonitor {
|
||||
a.cleanup() // This will close the module instance and compiled ref
|
||||
log.Info("MCP agent state cleaned up after WASM module exit/error")
|
||||
} else {
|
||||
// This case can happen if cleanup was called manually or if a new instance
|
||||
// was started before the old one finished exiting.
|
||||
log.Debug("WASM module exited, but state already updated or module mismatch. Explicitly closing compiled ref if needed.")
|
||||
// Manually close the compiled module ref associated with this specific instance
|
||||
// as cleanup() won't if a.wasmModule doesn't match or is nil.
|
||||
if compiledToClose != nil {
|
||||
_ = compiledToClose.Close(context.Background())
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
}(instance, compiledModule, instanceErrChan) // Pass necessary refs
|
||||
|
||||
// Success: prevent deferred cleanup of compiled module, return resources needed by caller
|
||||
shouldCloseCompiledOnError = false
|
||||
return hostStdinWriter, hostStdoutReader, instance, compiledModule, nil // Return instance and compiled module
|
||||
}
|
||||
// Note: A Close method on MCPAgent itself isn't part of agents.Interface.
|
||||
// Cleanup of the specific implementation happens via impl.Close().
|
||||
// Cleanup of shared Wazero resources needs separate handling (e.g., on app shutdown).
|
||||
|
||||
// ArtistArgs defines the structure for MCP tool arguments requiring artist info.
|
||||
// Exported for use in tests.
|
||||
// Keep it here as it's used by both implementations indirectly via callMCPTool.
|
||||
type ArtistArgs struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Mbid string `json:"mbid,omitempty"`
|
||||
}
|
||||
|
||||
// callMCPTool is a helper to perform the common steps of calling an MCP tool.
|
||||
func (a *MCPAgent) callMCPTool(ctx context.Context, toolName string, args any) (string, error) {
|
||||
// Ensure the client is initialized and the server is running (attempts restart if needed)
|
||||
if err := a.ensureClientInitialized(ctx); err != nil {
|
||||
log.Error(ctx, "MCP agent initialization/restart failed, cannot call tool", "tool", toolName, "error", err)
|
||||
return "", fmt.Errorf("MCP agent not ready: %w", err)
|
||||
}
|
||||
|
||||
// Lock to safely access the shared client resource
|
||||
a.mu.Lock()
|
||||
|
||||
// Check if the client is valid *after* ensuring initialization and acquiring lock.
|
||||
if a.client == nil {
|
||||
a.mu.Unlock() // Release lock before returning error
|
||||
log.Error(ctx, "MCP client became invalid after initialization check (server process likely died)", "tool", toolName)
|
||||
return "", fmt.Errorf("MCP agent process is not running")
|
||||
}
|
||||
|
||||
// Keep a reference to the client while locked
|
||||
currentClient := a.client
|
||||
a.mu.Unlock() // *Release lock before* making the potentially blocking MCP call
|
||||
|
||||
// Call the tool using the client reference
|
||||
log.Debug(ctx, "Calling MCP tool", "tool", toolName, "args", args)
|
||||
response, err := currentClient.CallTool(ctx, toolName, args)
|
||||
if err != nil {
|
||||
// Handle potential pipe closures or other communication errors
|
||||
log.Error(ctx, "Failed to call MCP tool", "tool", toolName, "error", err)
|
||||
// Check if the error indicates a broken pipe, suggesting the server died
|
||||
if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "EOF") {
|
||||
log.Warn(ctx, "MCP tool call failed, possibly due to server process exit. State will be reset.", "tool", toolName)
|
||||
// State reset is handled by the monitoring goroutine, just return error
|
||||
return "", fmt.Errorf("MCP agent process communication error: %w", err)
|
||||
}
|
||||
return "", fmt.Errorf("failed to call MCP tool '%s': %w", toolName, err)
|
||||
}
|
||||
|
||||
// Process the response
|
||||
if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil || response.Content[0].TextContent.Text == "" {
|
||||
log.Warn(ctx, "MCP tool returned empty or invalid response structure", "tool", toolName)
|
||||
return "", agents.ErrNotFound
|
||||
}
|
||||
|
||||
// Check if the returned text content itself indicates an error from the MCP tool
|
||||
resultText := response.Content[0].TextContent.Text
|
||||
if strings.HasPrefix(resultText, "handler returned an error:") {
|
||||
log.Warn(ctx, "MCP tool returned an error message in its response", "tool", toolName, "mcpError", resultText)
|
||||
return "", agents.ErrNotFound // Treat MCP tool errors as "not found"
|
||||
}
|
||||
|
||||
// Return the successful text content
|
||||
log.Debug(ctx, "Received response from MCP agent", "tool", toolName, "length", len(resultText))
|
||||
return resultText, nil
|
||||
}
|
||||
|
||||
// GetArtistBiography retrieves the artist biography by calling the external MCP server.
|
||||
func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
args := ArtistArgs{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Mbid: mbid,
|
||||
}
|
||||
return a.callMCPTool(ctx, McpToolNameGetBio, args)
|
||||
}
|
||||
|
||||
// GetArtistURL retrieves the artist URL by calling the external MCP server.
|
||||
func (a *MCPAgent) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
args := ArtistArgs{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Mbid: mbid,
|
||||
}
|
||||
return a.callMCPTool(ctx, McpToolNameGetURL, args)
|
||||
}
|
||||
|
||||
// Ensure MCPAgent implements the required interfaces
|
||||
// Ensure MCPAgent still fulfills the registration requirements indirectly
|
||||
var _ agents.ArtistBiographyRetriever = (*MCPAgent)(nil)
|
||||
var _ agents.ArtistURLRetriever = (*MCPAgent)(nil)
|
||||
|
||||
func init() {
|
||||
// Register the real constructor, not the test one
|
||||
agents.Register(McpAgentName, mcpConstructor)
|
||||
}
|
||||
|
||||
// Core logic moved to implementations.
|
||||
|
@ -13,8 +13,11 @@ import (
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// mcpClient defines the interface for the MCP client methods used by the agent.
|
||||
// This allows mocking the client for testing.
|
||||
// Use the exported mcpClient interface from the mcp package
|
||||
// type internalMCPClient = mcp.InternalMCPClient // Assuming you export the interface // REMOVE THIS LINE
|
||||
|
||||
// Define the mcpClient interface locally for mocking, matching the one
|
||||
// used internally by MCPNative/MCPWasm.
|
||||
type mcpClient interface {
|
||||
Initialize(ctx context.Context) (*mcp_client.InitializeResponse, error)
|
||||
CallTool(ctx context.Context, toolName string, args any) (*mcp_client.ToolResponse, error)
|
||||
@ -44,13 +47,15 @@ func (m *mockMCPClient) CallTool(ctx context.Context, toolName string, args any)
|
||||
return &mcp_client.ToolResponse{}, nil
|
||||
}
|
||||
|
||||
// Ensure mock implements the interface (compile-time check)
|
||||
// Ensure mock implements the local interface (compile-time check)
|
||||
var _ mcpClient = (*mockMCPClient)(nil)
|
||||
|
||||
var _ = Describe("MCPAgent", func() {
|
||||
var (
|
||||
ctx context.Context
|
||||
agent *mcp.MCPAgent // Use concrete type from the package
|
||||
ctx context.Context
|
||||
// We test the public MCPAgent wrapper, which uses the implementations internally.
|
||||
// The actual agent instance might be native or wasm depending on McpServerPath
|
||||
agent agents.Interface // Use the public agents.Interface
|
||||
mockClient *mockMCPClient
|
||||
)
|
||||
|
||||
@ -60,12 +65,19 @@ var _ = Describe("MCPAgent", func() {
|
||||
callToolArgs: make([]any, 0), // Reset args on each test
|
||||
}
|
||||
|
||||
// Instantiate the real agent
|
||||
agent = &mcp.MCPAgent{}
|
||||
// Inject the mock client directly using the exported override field
|
||||
agent.ClientOverride = mockClient
|
||||
// Instantiate the real agent using a testing constructor
|
||||
// This constructor needs to be added to the mcp package.
|
||||
agent = mcp.NewAgentForTesting(mockClient)
|
||||
Expect(agent).NotTo(BeNil(), "Agent should be created")
|
||||
})
|
||||
|
||||
// Helper to get the concrete agent type for calling specific methods
|
||||
getConcreteAgent := func() *mcp.MCPAgent {
|
||||
concreteAgent, ok := agent.(*mcp.MCPAgent)
|
||||
Expect(ok).To(BeTrue(), "Agent should be of type *mcp.MCPAgent")
|
||||
return concreteAgent
|
||||
}
|
||||
|
||||
Describe("GetArtistBiography", func() {
|
||||
It("should call the correct tool and return the biography", func() {
|
||||
expectedBio := "This is the artist bio."
|
||||
@ -79,7 +91,7 @@ var _ = Describe("MCPAgent", func() {
|
||||
return mcp_client.NewToolResponse(mcp_client.NewTextContent(expectedBio)), nil
|
||||
}
|
||||
|
||||
bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(bio).To(Equal(expectedBio))
|
||||
})
|
||||
@ -90,10 +102,16 @@ var _ = Describe("MCPAgent", func() {
|
||||
return nil, expectedErr
|
||||
}
|
||||
|
||||
bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
Expect(err).To(HaveOccurred())
|
||||
// The error originates from the implementation now, check for specific part
|
||||
Expect(err.Error()).To(SatisfyAny(
|
||||
ContainSubstring("native MCP agent not ready"), // Error from native
|
||||
ContainSubstring("WASM MCP agent not ready"), // Error from WASM
|
||||
ContainSubstring("failed to call native MCP tool"),
|
||||
ContainSubstring("failed to call WASM MCP tool"),
|
||||
))
|
||||
Expect(errors.Is(err, expectedErr)).To(BeTrue())
|
||||
Expect(err.Error()).To(ContainSubstring(fmt.Sprintf("failed to call MCP tool '%s'", mcp.McpToolNameGetBio)))
|
||||
Expect(bio).To(BeEmpty())
|
||||
})
|
||||
|
||||
@ -103,7 +121,7 @@ var _ = Describe("MCPAgent", func() {
|
||||
return mcp_client.NewToolResponse(), nil
|
||||
}
|
||||
|
||||
bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
Expect(err).To(MatchError(agents.ErrNotFound))
|
||||
Expect(bio).To(BeEmpty())
|
||||
})
|
||||
@ -114,7 +132,7 @@ var _ = Describe("MCPAgent", func() {
|
||||
return mcp_client.NewToolResponse(mcp_client.NewTextContent("")), nil
|
||||
}
|
||||
|
||||
bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
Expect(err).To(MatchError(agents.ErrNotFound))
|
||||
Expect(bio).To(BeEmpty())
|
||||
})
|
||||
@ -124,9 +142,12 @@ var _ = Describe("MCPAgent", func() {
|
||||
return nil, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("MCP agent process communication error"))
|
||||
Expect(err.Error()).To(SatisfyAny(
|
||||
ContainSubstring("native MCP agent process communication error"),
|
||||
ContainSubstring("WASM MCP agent module communication error"),
|
||||
))
|
||||
Expect(errors.Is(err, io.ErrClosedPipe)).To(BeTrue())
|
||||
Expect(bio).To(BeEmpty())
|
||||
})
|
||||
@ -137,7 +158,7 @@ var _ = Describe("MCPAgent", func() {
|
||||
return mcp_client.NewToolResponse(mcp_client.NewTextContent(mcpErrorString)), nil
|
||||
}
|
||||
|
||||
bio, err := agent.GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
bio, err := getConcreteAgent().GetArtistBiography(ctx, "id1", "Artist Name", "mbid1")
|
||||
Expect(err).To(MatchError(agents.ErrNotFound))
|
||||
Expect(bio).To(BeEmpty())
|
||||
})
|
||||
@ -156,7 +177,7 @@ var _ = Describe("MCPAgent", func() {
|
||||
return mcp_client.NewToolResponse(mcp_client.NewTextContent(expectedURL)), nil
|
||||
}
|
||||
|
||||
url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(url).To(Equal(expectedURL))
|
||||
})
|
||||
@ -167,10 +188,15 @@ var _ = Describe("MCPAgent", func() {
|
||||
return nil, expectedErr
|
||||
}
|
||||
|
||||
url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(SatisfyAny(
|
||||
ContainSubstring("native MCP agent not ready"), // Error from native
|
||||
ContainSubstring("WASM MCP agent not ready"), // Error from WASM
|
||||
ContainSubstring("failed to call native MCP tool"),
|
||||
ContainSubstring("failed to call WASM MCP tool"),
|
||||
))
|
||||
Expect(errors.Is(err, expectedErr)).To(BeTrue())
|
||||
Expect(err.Error()).To(ContainSubstring(fmt.Sprintf("failed to call MCP tool '%s'", mcp.McpToolNameGetURL)))
|
||||
Expect(url).To(BeEmpty())
|
||||
})
|
||||
|
||||
@ -180,7 +206,7 @@ var _ = Describe("MCPAgent", func() {
|
||||
return mcp_client.NewToolResponse(), nil
|
||||
}
|
||||
|
||||
url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
Expect(err).To(MatchError(agents.ErrNotFound))
|
||||
Expect(url).To(BeEmpty())
|
||||
})
|
||||
@ -190,9 +216,12 @@ var _ = Describe("MCPAgent", func() {
|
||||
return nil, fmt.Errorf("write: %w", io.ErrClosedPipe)
|
||||
}
|
||||
|
||||
url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("MCP agent process communication error"))
|
||||
Expect(err.Error()).To(SatisfyAny(
|
||||
ContainSubstring("native MCP agent process communication error"),
|
||||
ContainSubstring("WASM MCP agent module communication error"),
|
||||
))
|
||||
Expect(errors.Is(err, io.ErrClosedPipe)).To(BeTrue())
|
||||
Expect(url).To(BeEmpty())
|
||||
})
|
||||
@ -203,7 +232,7 @@ var _ = Describe("MCPAgent", func() {
|
||||
return mcp_client.NewToolResponse(mcp_client.NewTextContent(mcpErrorString)), nil
|
||||
}
|
||||
|
||||
url, err := agent.GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
url, err := getConcreteAgent().GetArtistURL(ctx, "id2", "Another Artist", "mbid2")
|
||||
Expect(err).To(MatchError(agents.ErrNotFound))
|
||||
Expect(url).To(BeEmpty())
|
||||
})
|
||||
|
@ -2,58 +2,250 @@ package mcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
mcp "github.com/metoro-io/mcp-golang"
|
||||
"github.com/metoro-io/mcp-golang/transport/stdio"
|
||||
"github.com/navidrome/navidrome/core/agents"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
)
|
||||
|
||||
// startNativeProcess starts the MCP server as a native executable.
|
||||
func (a *MCPAgent) startNativeProcess(ctx context.Context) (stdin io.WriteCloser, stdout io.ReadCloser, cmd *exec.Cmd, err error) {
|
||||
log.Debug(ctx, "Starting native MCP server process", "path", McpServerPath)
|
||||
cmd = exec.CommandContext(context.Background(), McpServerPath) // Use Background context for long-running process
|
||||
// MCPNative implements the mcpImplementation interface for running the MCP server as a native process.
|
||||
type MCPNative struct {
|
||||
mu sync.Mutex
|
||||
cmd *exec.Cmd // Stores the running command
|
||||
stdin io.WriteCloser
|
||||
client mcpClient
|
||||
|
||||
stdin, err = cmd.StdinPipe()
|
||||
// ClientOverride allows injecting a mock client for testing this specific implementation.
|
||||
ClientOverride mcpClient // TODO: Consider if this is the best way to test
|
||||
}
|
||||
|
||||
// newMCPNative creates a new instance of the native MCP agent implementation.
|
||||
func newMCPNative() *MCPNative {
|
||||
return &MCPNative{}
|
||||
}
|
||||
|
||||
// --- mcpImplementation interface methods ---
|
||||
|
||||
func (n *MCPNative) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
args := ArtistArgs{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Mbid: mbid,
|
||||
}
|
||||
return n.callMCPTool(ctx, McpToolNameGetBio, args)
|
||||
}
|
||||
|
||||
func (n *MCPNative) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
args := ArtistArgs{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Mbid: mbid,
|
||||
}
|
||||
return n.callMCPTool(ctx, McpToolNameGetURL, args)
|
||||
}
|
||||
|
||||
func (n *MCPNative) Close() error {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
n.cleanupResources_locked()
|
||||
return nil // Currently, cleanup doesn't return errors
|
||||
}
|
||||
|
||||
// --- Internal Helper Methods ---
|
||||
|
||||
// ensureClientInitialized starts the MCP server process and initializes the client if needed.
|
||||
// MUST be called with the mutex HELD.
|
||||
func (n *MCPNative) ensureClientInitialized_locked(ctx context.Context) error {
|
||||
// Use override if provided (for testing)
|
||||
if n.ClientOverride != nil {
|
||||
if n.client == nil {
|
||||
n.client = n.ClientOverride
|
||||
log.Debug(ctx, "Using provided MCP client override for native testing")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if already initialized
|
||||
if n.client != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info(ctx, "Initializing Native MCP client and starting/restarting server process...", "serverPath", McpServerPath)
|
||||
|
||||
// Clean up any old resources *before* starting new ones
|
||||
n.cleanupResources_locked()
|
||||
|
||||
hostStdinWriter, hostStdoutReader, nativeCmd, startErr := n.startProcess_locked(ctx)
|
||||
if startErr != nil {
|
||||
log.Error(ctx, "Failed to start Native MCP server process", "error", startErr)
|
||||
// Ensure pipes are closed if start failed (startProcess might handle this, but be sure)
|
||||
if hostStdinWriter != nil {
|
||||
_ = hostStdinWriter.Close()
|
||||
}
|
||||
if hostStdoutReader != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
}
|
||||
return fmt.Errorf("failed to start native MCP server: %w", startErr)
|
||||
}
|
||||
|
||||
// --- Initialize MCP client ---
|
||||
transport := stdio.NewStdioServerTransportWithIO(hostStdoutReader, hostStdinWriter)
|
||||
clientImpl := mcp.NewClient(transport)
|
||||
|
||||
initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout)
|
||||
defer cancel()
|
||||
if _, initErr := clientImpl.Initialize(initCtx); initErr != nil {
|
||||
err := fmt.Errorf("failed to initialize native MCP client: %w", initErr)
|
||||
log.Error(ctx, "Native MCP client initialization failed", "error", err)
|
||||
// Cleanup the newly started process and close pipes
|
||||
n.cmd = nativeCmd // Temporarily set cmd so cleanup can kill it
|
||||
n.cleanupResources_locked()
|
||||
if hostStdinWriter != nil {
|
||||
_ = hostStdinWriter.Close()
|
||||
}
|
||||
if hostStdoutReader != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// --- Initialization successful, update agent state ---
|
||||
n.cmd = nativeCmd
|
||||
n.stdin = hostStdinWriter // This is the pipe the agent writes to
|
||||
n.client = clientImpl
|
||||
|
||||
log.Info(ctx, "Native MCP client initialized successfully", "pid", n.cmd.Process.Pid)
|
||||
return nil // Success
|
||||
}
|
||||
|
||||
// callMCPTool handles ensuring initialization and calling the MCP tool.
|
||||
func (n *MCPNative) callMCPTool(ctx context.Context, toolName string, args any) (string, error) {
|
||||
// Ensure the client is initialized and the server is running (attempts restart if needed)
|
||||
n.mu.Lock()
|
||||
err := n.ensureClientInitialized_locked(ctx)
|
||||
if err != nil {
|
||||
n.mu.Unlock()
|
||||
log.Error(ctx, "Native MCP agent initialization/restart failed", "tool", toolName, "error", err)
|
||||
return "", fmt.Errorf("native MCP agent not ready: %w", err)
|
||||
}
|
||||
|
||||
// Keep a reference to the client while locked
|
||||
currentClient := n.client
|
||||
// Unlock mutex *before* making the potentially blocking MCP call
|
||||
n.mu.Unlock()
|
||||
|
||||
// Call the tool using the client reference
|
||||
log.Debug(ctx, "Calling Native MCP tool", "tool", toolName, "args", args)
|
||||
response, callErr := currentClient.CallTool(ctx, toolName, args)
|
||||
if callErr != nil {
|
||||
// Handle potential pipe closures or other communication errors
|
||||
log.Error(ctx, "Failed to call Native MCP tool", "tool", toolName, "error", callErr)
|
||||
// Check if the error indicates a broken pipe, suggesting the server died
|
||||
// The monitoring goroutine will handle cleanup, just return error here.
|
||||
if errors.Is(callErr, io.ErrClosedPipe) || strings.Contains(callErr.Error(), "broken pipe") || strings.Contains(callErr.Error(), "EOF") {
|
||||
log.Warn(ctx, "Native MCP tool call failed, possibly due to server process exit.", "tool", toolName)
|
||||
// No need to explicitly call cleanup, monitoring goroutine handles it.
|
||||
return "", fmt.Errorf("native MCP agent process communication error: %w", callErr)
|
||||
}
|
||||
return "", fmt.Errorf("failed to call native MCP tool '%s': %w", toolName, callErr)
|
||||
}
|
||||
|
||||
// Process the response (same logic as before)
|
||||
if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil || response.Content[0].TextContent.Text == "" {
|
||||
log.Warn(ctx, "Native MCP tool returned empty/invalid response", "tool", toolName)
|
||||
// Treat as not found for agent interface consistency
|
||||
return "", agents.ErrNotFound // Import agents package if needed, or define locally
|
||||
}
|
||||
resultText := response.Content[0].TextContent.Text
|
||||
if strings.HasPrefix(resultText, "handler returned an error:") {
|
||||
log.Warn(ctx, "Native MCP tool returned an error message", "tool", toolName, "mcpError", resultText)
|
||||
return "", agents.ErrNotFound // Treat MCP tool errors as "not found"
|
||||
}
|
||||
|
||||
log.Debug(ctx, "Received response from Native MCP agent", "tool", toolName, "length", len(resultText))
|
||||
return resultText, nil
|
||||
}
|
||||
|
||||
// cleanupResources closes existing resources (stdin, server process).
|
||||
// MUST be called with the mutex HELD.
|
||||
func (n *MCPNative) cleanupResources_locked() {
|
||||
log.Debug(context.Background(), "Cleaning up Native MCP instance resources...")
|
||||
if n.stdin != nil {
|
||||
_ = n.stdin.Close()
|
||||
n.stdin = nil
|
||||
}
|
||||
if n.cmd != nil && n.cmd.Process != nil {
|
||||
pid := n.cmd.Process.Pid
|
||||
log.Debug(context.Background(), "Killing native MCP process", "pid", pid)
|
||||
// Kill the process. Ignore error if it's already done.
|
||||
if err := n.cmd.Process.Kill(); err != nil && !errors.Is(err, os.ErrProcessDone) {
|
||||
log.Error(context.Background(), "Failed to kill native process", "pid", pid, "error", err)
|
||||
}
|
||||
// Wait for the process to release resources. Ignore error.
|
||||
_ = n.cmd.Wait()
|
||||
n.cmd = nil
|
||||
}
|
||||
// Mark client as invalid
|
||||
n.client = nil
|
||||
}
|
||||
|
||||
// startProcess starts the MCP server as a native executable and sets up monitoring.
|
||||
// MUST be called with the mutex HELD.
|
||||
func (n *MCPNative) startProcess_locked(ctx context.Context) (stdin io.WriteCloser, stdout io.ReadCloser, cmd *exec.Cmd, err error) {
|
||||
log.Debug(ctx, "Starting native MCP server process", "path", McpServerPath)
|
||||
// Use Background context for the command itself, as it should outlive the request context (ctx)
|
||||
cmd = exec.CommandContext(context.Background(), McpServerPath)
|
||||
|
||||
stdinPipe, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("native stdin pipe: %w", err)
|
||||
}
|
||||
|
||||
stdout, err = cmd.StdoutPipe()
|
||||
stdoutPipe, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
_ = stdin.Close()
|
||||
_ = stdinPipe.Close()
|
||||
return nil, nil, nil, fmt.Errorf("native stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
var stderr strings.Builder
|
||||
cmd.Stderr = &stderr
|
||||
var stderrBuf strings.Builder
|
||||
cmd.Stderr = &stderrBuf
|
||||
|
||||
if err = cmd.Start(); err != nil {
|
||||
_ = stdin.Close()
|
||||
_ = stdout.Close()
|
||||
_ = stdinPipe.Close()
|
||||
_ = stdoutPipe.Close()
|
||||
return nil, nil, nil, fmt.Errorf("native start: %w", err)
|
||||
}
|
||||
|
||||
currentPid := cmd.Process.Pid
|
||||
currentCmd := cmd // Capture the current cmd pointer for the goroutine
|
||||
log.Info(ctx, "Native MCP server process started", "pid", currentPid)
|
||||
|
||||
// Start monitoring goroutine
|
||||
go func(processCmd *exec.Cmd, processStderr *strings.Builder, processPid int) {
|
||||
waitErr := processCmd.Wait() // Wait for the process to exit
|
||||
a.mu.Lock()
|
||||
log.Warn("Native MCP server process exited", "pid", processPid, "error", waitErr, "stderr", processStderr.String())
|
||||
// Check if the cmd matches the one we are monitoring before cleaning up
|
||||
// Use the central cleanup function which handles nil checks.
|
||||
if a.cmd == processCmd {
|
||||
a.cleanup() // Use the central cleanup function
|
||||
log.Info("MCP agent state cleaned up after native process exit", "pid", processPid)
|
||||
go func() {
|
||||
waitErr := currentCmd.Wait() // Wait for the specific process this goroutine monitors
|
||||
n.mu.Lock()
|
||||
stderrStr := stderrBuf.String()
|
||||
log.Warn("Native MCP server process exited", "pid", currentPid, "error", waitErr, "stderr", stderrStr)
|
||||
|
||||
// Critical: Check if the agent's current command is STILL the one we were monitoring.
|
||||
// If it's different, it means cleanup/restart already happened, so we shouldn't cleanup again.
|
||||
if n.cmd == currentCmd {
|
||||
n.cleanupResources_locked() // Use the locked version as we hold the lock
|
||||
log.Info("MCP Native agent state cleaned up after process exit", "pid", currentPid)
|
||||
} else {
|
||||
log.Debug("Native MCP agent process exited, but state already updated or cmd mismatch", "exitedPid", processPid)
|
||||
log.Debug("Native MCP process exited, but state already updated/cmd mismatch", "exitedPid", currentPid)
|
||||
}
|
||||
a.mu.Unlock()
|
||||
}(cmd, &stderr, currentPid)
|
||||
n.mu.Unlock()
|
||||
}()
|
||||
|
||||
// Return the pipes connected to the process and the Cmd object
|
||||
return stdin, stdout, cmd, nil
|
||||
return stdinPipe, stdoutPipe, cmd, nil
|
||||
}
|
||||
|
352
core/agents/mcp/mcp_process_wazero.go
Normal file
352
core/agents/mcp/mcp_process_wazero.go
Normal file
@ -0,0 +1,352 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mcp "github.com/metoro-io/mcp-golang"
|
||||
"github.com/metoro-io/mcp-golang/transport/stdio"
|
||||
"github.com/navidrome/navidrome/core/agents" // Needed for ErrNotFound
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/tetratelabs/wazero" // Needed for types
|
||||
"github.com/tetratelabs/wazero/api" // Needed for types
|
||||
)
|
||||
|
||||
// MCPWasm implements the mcpImplementation interface for running the MCP server as a WASM module.
|
||||
type MCPWasm struct {
|
||||
mu sync.Mutex
|
||||
wasmModule api.Module // Stores the instantiated module
|
||||
wasmCompiled api.Closer // Stores the compiled module reference for this instance
|
||||
stdin io.WriteCloser
|
||||
client mcpClient
|
||||
|
||||
// Shared resources (passed in, not owned by this struct)
|
||||
wasmRuntime api.Closer // Closer for the shared Wazero Runtime
|
||||
wasmCache wazero.CompilationCache // Shared Compilation Cache (can be nil)
|
||||
|
||||
// ClientOverride allows injecting a mock client for testing this specific implementation.
|
||||
ClientOverride mcpClient // TODO: Consider if this is the best way to test
|
||||
}
|
||||
|
||||
// newMCPWasm creates a new instance of the WASM MCP agent implementation.
|
||||
func newMCPWasm(runtime api.Closer, cache wazero.CompilationCache) *MCPWasm {
|
||||
return &MCPWasm{
|
||||
wasmRuntime: runtime,
|
||||
wasmCache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
// --- mcpImplementation interface methods ---
|
||||
|
||||
func (w *MCPWasm) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
args := ArtistArgs{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Mbid: mbid,
|
||||
}
|
||||
return w.callMCPTool(ctx, McpToolNameGetBio, args)
|
||||
}
|
||||
|
||||
func (w *MCPWasm) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
args := ArtistArgs{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Mbid: mbid,
|
||||
}
|
||||
return w.callMCPTool(ctx, McpToolNameGetURL, args)
|
||||
}
|
||||
|
||||
// Close cleans up instance-specific WASM resources.
|
||||
// It does NOT close the shared runtime or cache.
|
||||
func (w *MCPWasm) Close() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
w.cleanupResources_locked()
|
||||
return nil // Currently, cleanup doesn't return errors
|
||||
}
|
||||
|
||||
// --- Internal Helper Methods ---
|
||||
|
||||
// ensureClientInitialized starts the MCP WASM module and initializes the client if needed.
|
||||
// MUST be called with the mutex HELD.
|
||||
func (w *MCPWasm) ensureClientInitialized_locked(ctx context.Context) error {
|
||||
// Use override if provided (for testing)
|
||||
if w.ClientOverride != nil {
|
||||
if w.client == nil {
|
||||
w.client = w.ClientOverride
|
||||
log.Debug(ctx, "Using provided MCP client override for WASM testing")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if already initialized
|
||||
if w.client != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info(ctx, "Initializing WASM MCP client and starting/restarting server module...", "serverPath", McpServerPath)
|
||||
|
||||
// Clean up any old instance resources *before* starting new ones
|
||||
w.cleanupResources_locked()
|
||||
|
||||
// Check if shared runtime exists (it should if constructor succeeded)
|
||||
if w.wasmRuntime == nil {
|
||||
return errors.New("shared Wazero runtime not initialized for MCPWasm")
|
||||
}
|
||||
|
||||
hostStdinWriter, hostStdoutReader, mod, compiled, startErr := w.startModule_locked(ctx)
|
||||
if startErr != nil {
|
||||
log.Error(ctx, "Failed to start WASM MCP server module", "error", startErr)
|
||||
// Ensure pipes are closed if start failed
|
||||
if hostStdinWriter != nil {
|
||||
_ = hostStdinWriter.Close()
|
||||
}
|
||||
if hostStdoutReader != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
}
|
||||
// startModule_locked handles cleanup of mod/compiled on error
|
||||
return fmt.Errorf("failed to start WASM MCP server: %w", startErr)
|
||||
}
|
||||
|
||||
// --- Initialize MCP client ---
|
||||
transport := stdio.NewStdioServerTransportWithIO(hostStdoutReader, hostStdinWriter)
|
||||
clientImpl := mcp.NewClient(transport)
|
||||
|
||||
initCtx, cancel := context.WithTimeout(context.Background(), initializationTimeout)
|
||||
defer cancel()
|
||||
if _, initErr := clientImpl.Initialize(initCtx); initErr != nil {
|
||||
err := fmt.Errorf("failed to initialize WASM MCP client: %w", initErr)
|
||||
log.Error(ctx, "WASM MCP client initialization failed", "error", err)
|
||||
// Cleanup the newly started module and close pipes
|
||||
w.wasmModule = mod // Temporarily set so cleanup can close it
|
||||
w.wasmCompiled = compiled // Temporarily set so cleanup can close it
|
||||
w.cleanupResources_locked()
|
||||
if hostStdinWriter != nil {
|
||||
_ = hostStdinWriter.Close()
|
||||
}
|
||||
if hostStdoutReader != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// --- Initialization successful, update agent state ---
|
||||
w.wasmModule = mod
|
||||
w.wasmCompiled = compiled
|
||||
w.stdin = hostStdinWriter // This is the pipe the agent writes to
|
||||
w.client = clientImpl
|
||||
|
||||
log.Info(ctx, "WASM MCP client initialized successfully")
|
||||
return nil // Success
|
||||
}
|
||||
|
||||
// callMCPTool handles ensuring initialization and calling the MCP tool.
|
||||
func (w *MCPWasm) callMCPTool(ctx context.Context, toolName string, args any) (string, error) {
|
||||
// Ensure the client is initialized and the server is running (attempts restart if needed)
|
||||
w.mu.Lock()
|
||||
err := w.ensureClientInitialized_locked(ctx)
|
||||
if err != nil {
|
||||
w.mu.Unlock()
|
||||
log.Error(ctx, "WASM MCP agent initialization/restart failed", "tool", toolName, "error", err)
|
||||
return "", fmt.Errorf("WASM MCP agent not ready: %w", err)
|
||||
}
|
||||
|
||||
// Keep a reference to the client while locked
|
||||
currentClient := w.client
|
||||
// Unlock mutex *before* making the potentially blocking MCP call
|
||||
w.mu.Unlock()
|
||||
|
||||
// Call the tool using the client reference
|
||||
log.Debug(ctx, "Calling WASM MCP tool", "tool", toolName, "args", args)
|
||||
response, callErr := currentClient.CallTool(ctx, toolName, args)
|
||||
if callErr != nil {
|
||||
// Handle potential pipe closures or other communication errors
|
||||
log.Error(ctx, "Failed to call WASM MCP tool", "tool", toolName, "error", callErr)
|
||||
// Check if the error indicates a broken pipe, suggesting the server died
|
||||
// The monitoring goroutine will handle cleanup, just return error here.
|
||||
if errors.Is(callErr, io.ErrClosedPipe) || strings.Contains(callErr.Error(), "broken pipe") || strings.Contains(callErr.Error(), "EOF") {
|
||||
log.Warn(ctx, "WASM MCP tool call failed, possibly due to server module exit.", "tool", toolName)
|
||||
// No need to explicitly call cleanup, monitoring goroutine handles it.
|
||||
return "", fmt.Errorf("WASM MCP agent module communication error: %w", callErr)
|
||||
}
|
||||
return "", fmt.Errorf("failed to call WASM MCP tool '%s': %w", toolName, callErr)
|
||||
}
|
||||
|
||||
// Process the response (same logic as native)
|
||||
if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil || response.Content[0].TextContent.Text == "" {
|
||||
log.Warn(ctx, "WASM MCP tool returned empty/invalid response", "tool", toolName)
|
||||
return "", agents.ErrNotFound
|
||||
}
|
||||
resultText := response.Content[0].TextContent.Text
|
||||
if strings.HasPrefix(resultText, "handler returned an error:") {
|
||||
log.Warn(ctx, "WASM MCP tool returned an error message", "tool", toolName, "mcpError", resultText)
|
||||
return "", agents.ErrNotFound // Treat MCP tool errors as "not found"
|
||||
}
|
||||
|
||||
log.Debug(ctx, "Received response from WASM MCP agent", "tool", toolName, "length", len(resultText))
|
||||
return resultText, nil
|
||||
}
|
||||
|
||||
// cleanupResources closes instance-specific WASM resources (stdin, module, compiled ref).
|
||||
// It specifically avoids closing the shared runtime or cache.
|
||||
// MUST be called with the mutex HELD.
|
||||
func (w *MCPWasm) cleanupResources_locked() {
|
||||
log.Debug(context.Background(), "Cleaning up WASM MCP instance resources...")
|
||||
if w.stdin != nil {
|
||||
_ = w.stdin.Close()
|
||||
w.stdin = nil
|
||||
}
|
||||
// Close the module instance
|
||||
if w.wasmModule != nil {
|
||||
log.Debug(context.Background(), "Closing WASM module instance")
|
||||
ctxClose, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
if err := w.wasmModule.Close(ctxClose); err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
log.Error(context.Background(), "Failed to close WASM module instance", "error", err)
|
||||
}
|
||||
cancel()
|
||||
w.wasmModule = nil
|
||||
}
|
||||
// Close the compiled module reference for this instance
|
||||
if w.wasmCompiled != nil {
|
||||
log.Debug(context.Background(), "Closing compiled WASM module ref")
|
||||
if err := w.wasmCompiled.Close(context.Background()); err != nil {
|
||||
log.Error(context.Background(), "Failed to close compiled WASM module ref", "error", err)
|
||||
}
|
||||
w.wasmCompiled = nil
|
||||
}
|
||||
// Mark client as invalid
|
||||
w.client = nil
|
||||
// DO NOT CLOSE w.wasmRuntime or w.wasmCache here!
|
||||
}
|
||||
|
||||
// startModule loads and starts the MCP server as a WASM module.
|
||||
// MUST be called with the mutex HELD.
|
||||
func (w *MCPWasm) startModule_locked(ctx context.Context) (hostStdinWriter io.WriteCloser, hostStdoutReader io.ReadCloser, mod api.Module, compiled api.Closer, err error) {
|
||||
log.Debug(ctx, "Loading WASM MCP server module", "path", McpServerPath)
|
||||
wasmBytes, err := os.ReadFile(McpServerPath)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, fmt.Errorf("read wasm file: %w", err)
|
||||
}
|
||||
|
||||
// Create pipes for stdio redirection
|
||||
wasmStdinReader, hostStdinWriter, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, fmt.Errorf("wasm stdin pipe: %w", err)
|
||||
}
|
||||
// Defer close pipes on error exit
|
||||
shouldClosePipesOnError := true
|
||||
defer func() {
|
||||
if shouldClosePipesOnError {
|
||||
if wasmStdinReader != nil {
|
||||
_ = wasmStdinReader.Close()
|
||||
}
|
||||
if hostStdinWriter != nil {
|
||||
_ = hostStdinWriter.Close()
|
||||
}
|
||||
// hostStdoutReader and wasmStdoutWriter handled below
|
||||
}
|
||||
}()
|
||||
|
||||
hostStdoutReader, wasmStdoutWriter, err := os.Pipe()
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, fmt.Errorf("wasm stdout pipe: %w", err)
|
||||
}
|
||||
// Defer close pipes on error exit
|
||||
defer func() {
|
||||
if shouldClosePipesOnError {
|
||||
if hostStdoutReader != nil {
|
||||
_ = hostStdoutReader.Close()
|
||||
}
|
||||
if wasmStdoutWriter != nil {
|
||||
_ = wasmStdoutWriter.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Use the SHARDED runtime from the agent struct
|
||||
runtime, ok := w.wasmRuntime.(wazero.Runtime)
|
||||
if !ok || runtime == nil {
|
||||
return nil, nil, nil, nil, errors.New("wasmRuntime is not initialized or not a wazero.Runtime")
|
||||
}
|
||||
|
||||
// Prepare module configuration
|
||||
// Host functions and WASI are already part of the shared runtime
|
||||
config := wazero.NewModuleConfig().
|
||||
WithStdin(wasmStdinReader).
|
||||
WithStdout(wasmStdoutWriter).
|
||||
WithStderr(os.Stderr).
|
||||
WithArgs(McpServerPath).
|
||||
WithFS(os.DirFS("/")) // Keep FS access for now
|
||||
|
||||
log.Debug(ctx, "Compiling WASM module (using cache if enabled)...")
|
||||
// Compile module using the shared runtime
|
||||
compiledModule, err := runtime.CompileModule(ctx, wasmBytes)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, fmt.Errorf("compile wasm module: %w", err)
|
||||
}
|
||||
// Defer closing compiled module only if an error occurs later in this function.
|
||||
shouldCloseCompiledOnError := true
|
||||
defer func() {
|
||||
if shouldCloseCompiledOnError && compiledModule != nil {
|
||||
_ = compiledModule.Close(context.Background())
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info(ctx, "Instantiating WASM module (will run _start)...")
|
||||
var instance api.Module
|
||||
instanceErrChan := make(chan error, 1)
|
||||
go func() {
|
||||
var instantiateErr error
|
||||
// Use context.Background() for the module's main execution context
|
||||
instance, instantiateErr = runtime.InstantiateModule(context.Background(), compiledModule, config)
|
||||
instanceErrChan <- instantiateErr
|
||||
}()
|
||||
|
||||
// Wait briefly for immediate instantiation errors
|
||||
select {
|
||||
case instantiateErr := <-instanceErrChan:
|
||||
if instantiateErr != nil {
|
||||
log.Error(ctx, "Failed to instantiate WASM module", "error", instantiateErr)
|
||||
// compiledModule closed by defer
|
||||
// pipes closed by defer
|
||||
return nil, nil, nil, nil, fmt.Errorf("instantiate wasm module: %w", instantiateErr)
|
||||
}
|
||||
log.Warn(ctx, "WASM module instantiation returned (exited?) immediately without error.")
|
||||
case <-time.After(2 * time.Second):
|
||||
log.Debug(ctx, "WASM module instantiation likely blocking (server running), proceeding...")
|
||||
}
|
||||
|
||||
// Start a monitoring goroutine for WASM module exit/error
|
||||
// Pass required values to the goroutine closure
|
||||
go func(instanceToMonitor api.Module, compiledToClose api.Closer, errChan chan error) {
|
||||
// This blocks until the instance created by InstantiateModule exits or errors.
|
||||
instantiateErr := <-errChan
|
||||
|
||||
w.mu.Lock() // Lock the specific MCPWasm instance
|
||||
log.Warn("WASM module exited/errored", "error", instantiateErr)
|
||||
|
||||
// Critical: Check if the agent's current module is STILL the one we were monitoring.
|
||||
if w.wasmModule == instanceToMonitor {
|
||||
w.cleanupResources_locked() // Use the locked version
|
||||
log.Info("MCP WASM agent state cleaned up after module exit/error")
|
||||
} else {
|
||||
log.Debug("WASM module exited, but state already updated/module mismatch. Explicitly closing this instance's compiled ref.")
|
||||
// Manually close the compiled module ref associated with *this specific instance*
|
||||
if compiledToClose != nil {
|
||||
_ = compiledToClose.Close(context.Background())
|
||||
}
|
||||
}
|
||||
w.mu.Unlock()
|
||||
}(instance, compiledModule, instanceErrChan)
|
||||
|
||||
// Success: prevent deferred cleanup of pipes and compiled module
|
||||
shouldClosePipesOnError = false
|
||||
shouldCloseCompiledOnError = false
|
||||
return hostStdinWriter, hostStdoutReader, instance, compiledModule, nil
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user