refactor: keep MCP agent server process running

Refactor MCPAgent to maintain a persistent external server process.\n\nInstead of starting and stopping the MCP server for each request, the agent\nnow uses sync.Once for lazy initialization on the first call.\nIt stores the running exec.Cmd, stdio pipes, and the mcp.Client\nin the agent struct.\n\nA sync.Mutex protects concurrent access to the client/pipes.\nA goroutine monitors the process using cmd.Wait() and logs if it exits\nunexpectedly.\n\nThis avoids the overhead of process creation/destruction on every\nmetadata retrieval request.
This commit is contained in:
Deluan 2025-04-19 12:04:39 -04:00
parent 51567a0bdf
commit 8326a20eda

View File

@ -2,10 +2,14 @@ package mcp
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strings"
"sync"
"time"
mcp "github.com/metoro-io/mcp-golang"
"github.com/metoro-io/mcp-golang/transport/stdio"
@ -23,8 +27,15 @@ const (
)
// MCPAgent interacts with an external MCP server for metadata retrieval.
// It keeps a single instance of the server process running.
type MCPAgent struct {
// ds model.DataStore // Not needed for this PoC
mu sync.Mutex
initOnce sync.Once
initErr error // Stores initialization error
cmd *exec.Cmd
stdin io.WriteCloser
client *mcp.Client
}
func mcpConstructor(ds model.DataStore) agents.Interface {
@ -33,7 +44,8 @@ func mcpConstructor(ds model.DataStore) agents.Interface {
log.Warn("MCP server executable not found, disabling agent", "path", mcpServerPath, "error", err)
return nil
}
log.Info("MCP Agent initialized", "serverPath", mcpServerPath)
log.Info("MCP Agent created, server will be started on first request", "serverPath", mcpServerPath)
// Initialize the struct, but don't start the process yet.
return &MCPAgent{}
}
@ -41,6 +53,86 @@ func (a *MCPAgent) AgentName() string {
return mcpAgentName
}
// ensureClientInitialized starts the MCP server process and initializes the client exactly once.
func (a *MCPAgent) ensureClientInitialized(ctx context.Context) error {
a.initOnce.Do(func() {
log.Info(ctx, "Initializing MCP client and starting server process...", "serverPath", mcpServerPath)
// Use background context for the command itself, so it doesn't get cancelled by the request context.
// We manage its lifecycle independently.
cmd := exec.CommandContext(context.Background(), mcpServerPath)
stdin, err := cmd.StdinPipe()
if err != nil {
a.initErr = fmt.Errorf("failed to get stdin pipe for MCP server: %w", err)
log.Error(ctx, "MCP init failed", "error", a.initErr)
return
}
stdout, err := cmd.StdoutPipe()
if err != nil {
a.initErr = fmt.Errorf("failed to get stdout pipe for MCP server: %w", err)
_ = stdin.Close() // Clean up stdin pipe
log.Error(ctx, "MCP init failed", "error", a.initErr)
return
}
// Capture stderr for debugging
var stderr strings.Builder
cmd.Stderr = &stderr
if err := cmd.Start(); err != nil {
a.initErr = fmt.Errorf("failed to start MCP server process: %w", err)
_ = stdin.Close()
_ = stdout.Close()
log.Error(ctx, "MCP init failed", "error", a.initErr)
return
}
a.cmd = cmd
a.stdin = stdin
log.Info(ctx, "MCP server process started", "pid", cmd.Process.Pid)
// Start a goroutine to wait for the process and log if it exits unexpectedly
go func() {
err := cmd.Wait()
log.Warn("MCP server process exited", "pid", cmd.Process.Pid, "error", err, "stderr", stderr.String())
// Maybe add logic here to attempt restart or mark the agent as unhealthy?
// For PoC, just log it.
// Ensure pipes are closed if the process exits
a.mu.Lock()
if a.stdin != nil {
_ = a.stdin.Close()
a.stdin = nil
}
// stdout is typically closed by the StdioServerTransport when done reading.
a.client = nil // Mark client as unusable
a.cmd = nil
a.mu.Unlock()
}()
// Create and initialize the MCP client
transport := stdio.NewStdioServerTransportWithIO(stdout, stdin)
client := mcp.NewClient(transport)
// Use a specific context for initialization, not the potentially short-lived request context
initCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Example timeout
defer cancel()
if _, err := client.Initialize(initCtx); err != nil {
a.initErr = fmt.Errorf("failed to initialize MCP client: %w", err)
log.Error(ctx, "MCP client initialization failed", "error", a.initErr)
// Attempt to kill the process we just started
if killErr := cmd.Process.Kill(); killErr != nil {
log.Error(ctx, "Failed to kill MCP server process after init failure", "pid", cmd.Process.Pid, "error", killErr)
}
return
}
a.client = client
log.Info(ctx, "MCP client initialized successfully")
})
return a.initErr
}
// getArtistBiographyArgs defines the structure for the MCP tool arguments.
// IMPORTANT: Field names MUST be exported (start with uppercase) for JSON marshalling,
// but the `json` tags determine the actual field names sent over MCP.
@ -52,56 +144,25 @@ type getArtistBiographyArgs struct {
// GetArtistBiography retrieves the artist biography by calling the external MCP server.
func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
// Ensure the client is initialized and the server is running
if err := a.ensureClientInitialized(ctx); err != nil {
log.Error(ctx, "MCP client not initialized, cannot get biography", "error", err)
// Don't wrap the initErr, return it directly so callers see the original init problem
return "", fmt.Errorf("MCP agent not initialized: %w", err)
}
// Lock to ensure only one request uses the client/pipes at a time
a.mu.Lock()
defer a.mu.Unlock()
// Check if the client is still valid (it might have been nilled if the process died)
if a.client == nil {
log.Error(ctx, "MCP client is not valid (server process likely died)")
return "", fmt.Errorf("MCP agent process is not running")
}
log.Debug(ctx, "Calling MCP agent GetArtistBiography", "id", id, "name", name, "mbid", mbid)
// Prepare command with context for cancellation handling
cmd := exec.CommandContext(ctx, mcpServerPath)
// Get pipes for stdin and stdout
stdin, err := cmd.StdinPipe()
if err != nil {
log.Error(ctx, "Failed to get stdin pipe for MCP server", "error", err)
return "", fmt.Errorf("failed to get stdin pipe: %w", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Error(ctx, "Failed to get stdout pipe for MCP server", "error", err)
return "", fmt.Errorf("failed to get stdout pipe: %w", err)
}
// Capture stderr for debugging, but don't block on it
var stderr strings.Builder
cmd.Stderr = &stderr
// Start the MCP server process
if err := cmd.Start(); err != nil {
log.Error(ctx, "Failed to start MCP server process", "path", mcpServerPath, "error", err)
return "", fmt.Errorf("failed to start MCP server: %w", err)
}
// Ensure the process is killed eventually
defer func() {
if err := cmd.Process.Kill(); err != nil {
log.Error(ctx, "Failed to kill MCP server process", "pid", cmd.Process.Pid, "error", err)
} else {
log.Debug(ctx, "Killed MCP server process", "pid", cmd.Process.Pid)
}
// Log stderr if it contains anything
if stderr.Len() > 0 {
log.Warn(ctx, "MCP server stderr output", "pid", cmd.Process.Pid, "stderr", stderr.String())
}
}()
log.Debug(ctx, "MCP server process started", "pid", cmd.Process.Pid)
// Create and initialize the MCP client
transport := stdio.NewStdioServerTransportWithIO(stdout, stdin)
client := mcp.NewClient(transport)
if _, err := client.Initialize(ctx); err != nil {
log.Error(ctx, "Failed to initialize MCP client", "error", err)
return "", fmt.Errorf("failed to initialize MCP client: %w", err)
}
log.Debug(ctx, "MCP client initialized")
// Prepare arguments for the tool call
args := getArtistBiographyArgs{
ID: id,
@ -109,18 +170,26 @@ func (a *MCPAgent) GetArtistBiography(ctx context.Context, id, name, mbid string
Mbid: mbid,
}
// Call the tool
// Call the tool using the persistent client
log.Debug(ctx, "Calling MCP tool", "tool", mcpToolName, "args", args)
response, err := client.CallTool(ctx, mcpToolName, args)
response, err := a.client.CallTool(ctx, mcpToolName, args)
if err != nil {
// Handle potential pipe closures or other communication errors
log.Error(ctx, "Failed to call MCP tool", "tool", mcpToolName, "error", err)
// Check if the error indicates a broken pipe, suggesting the server died
if errors.Is(err, io.ErrClosedPipe) || strings.Contains(err.Error(), "broken pipe") || strings.Contains(err.Error(), "EOF") {
log.Warn(ctx, "MCP tool call failed, possibly due to server process exit")
// Attempt to reset for next time? Or just fail?
// For now, just return a clear error.
return "", fmt.Errorf("MCP agent process communication error: %w", err)
}
return "", fmt.Errorf("failed to call MCP tool '%s': %w", mcpToolName, err)
}
// Process the response
if response == nil || len(response.Content) == 0 || response.Content[0].TextContent == nil {
log.Warn(ctx, "MCP tool returned empty or invalid response", "tool", mcpToolName)
return "", agents.ErrNotFound // Or a more specific error?
return "", agents.ErrNotFound
}
bio := response.Content[0].TextContent.Text