diff --git a/libs/arcade-mcp-server/arcade_mcp_server/mcp_app.py b/libs/arcade-mcp-server/arcade_mcp_server/mcp_app.py index 3bccbf6b..c64ec291 100644 --- a/libs/arcade-mcp-server/arcade_mcp_server/mcp_app.py +++ b/libs/arcade-mcp-server/arcade_mcp_server/mcp_app.py @@ -6,23 +6,27 @@ Provides a clean, minimal API for building MCP servers with lazy initialization. from __future__ import annotations +import asyncio import os +import subprocess import sys from pathlib import Path from typing import Any, Callable, Literal, ParamSpec, TypeVar +import uvicorn from arcade_core.catalog import MaterializedTool, ToolCatalog, ToolDefinitionError from arcade_tdk.auth import ToolAuthorization from arcade_tdk.error_adapters import ErrorAdapter from arcade_tdk.tool import tool as tool_decorator from dotenv import load_dotenv from loguru import logger +from watchfiles import watch from arcade_mcp_server.exceptions import ServerError from arcade_mcp_server.server import MCPServer from arcade_mcp_server.settings import MCPSettings, ServerSettings from arcade_mcp_server.types import Prompt, PromptMessage, Resource -from arcade_mcp_server.worker import run_arcade_mcp +from arcade_mcp_server.worker import create_arcade_mcp P = ParamSpec("P") T = TypeVar("T") @@ -231,20 +235,18 @@ class MCPApp: # Since the transport could have changed since __init__, we need to setup logging again self._setup_logging(transport == "stdio") + if os.getenv("ARCADE_MCP_CHILD_PROCESS") == "1": + # parent watcher has already been setup + reload = False + logger.info(f"Starting {self.name} v{self.version} with {len(self._catalog)} tools") if transport in ["http", "streamable-http", "streamable"]: - run_arcade_mcp( - catalog=self._catalog, - host=host, - port=port, - reload=reload, - mcp_settings=self._mcp_settings, - **self.server_kwargs, - ) + if reload: + self._run_with_reload(host, port) + else: + self._create_and_run_server(host, port) elif transport == "stdio": - import asyncio - from arcade_mcp_server.__main__ import run_stdio_server asyncio.run( @@ -257,6 +259,84 @@ class MCPApp: else: raise ServerError(f"Invalid transport: {transport}") + def _run_with_reload(self, host: str, port: int) -> None: + """ + Run with file watching for auto-reload. + + This method runs as the parent process that watches for file changes + and spawns/restarts child processes to run the actual server. + """ + env_file_path = Path.cwd() / ".env" + + def start_server_process() -> subprocess.Popen: + """Start a child process running the server.""" + env = os.environ.copy() + env["ARCADE_MCP_CHILD_PROCESS"] = "1" + + return subprocess.Popen( + [sys.executable, *sys.argv], + env=env, + ) + + def shutdown_server_process(process: subprocess.Popen, reason: str = "reload") -> None: + """Shutdown server process gracefully with fallback to force kill.""" + logger.info(f"Shutting down server for {reason}...") + process.terminate() + + try: + process.wait(timeout=5) + logger.info("Server shut down gracefully") + except subprocess.TimeoutExpired: + logger.warning( + "Server did not shut down within 5 seconds (likely due to active client connections). " + "Force killing server process..." + ) + process.kill() + process.wait() + logger.info("Server force killed") + + logger.info("Starting file watcher for auto-reload") + process = start_server_process() + + try: + + def watch_filter(change: Any, path: str) -> bool: + return path.endswith(".py") or (Path(path) == env_file_path) + + for changes in watch(".", watch_filter=watch_filter): + logger.info(f"Detected changes in {len(changes)} file(s), restarting server...") + shutdown_server_process(process, reason="reload") + process = start_server_process() + except KeyboardInterrupt: + logger.info("Received shutdown signal") + shutdown_server_process(process, reason="shutdown") + logger.info("File watcher stopped") + + def _create_and_run_server(self, host: str, port: int) -> None: + """ + Create and run the server directly without reload. + + This is used when reload=False or when running as a child process. + """ + debug = self.log_level == "DEBUG" + log_level = "debug" if debug else "info" + + app = create_arcade_mcp( + catalog=self._catalog, + mcp_settings=self._mcp_settings, + debug=debug, + **self.server_kwargs, + ) + + uvicorn.run( + app, + host=host, + port=port, + log_level=log_level, + reload=False, # MCPApp handles its own reload via parent/child process pattern + lifespan="on", + ) + @staticmethod def _get_configuration_overrides( host: str, port: int, transport: TransportType, reload: bool diff --git a/libs/arcade-mcp-server/arcade_mcp_server/worker.py b/libs/arcade-mcp-server/arcade_mcp_server/worker.py index 6fc6a4f9..8755b1e7 100644 --- a/libs/arcade-mcp-server/arcade_mcp_server/worker.py +++ b/libs/arcade-mcp-server/arcade_mcp_server/worker.py @@ -282,6 +282,9 @@ def run_arcade_mcp( ) -> None: """ Run the integrated Arcade MCP server with uvicorn. + + This is used for module execution (`arcade mcp` and `python -m arcade_mcp_server`) only. + MCPApp has its own reload mechanism. """ import os diff --git a/libs/arcade-mcp-server/pyproject.toml b/libs/arcade-mcp-server/pyproject.toml index afe4f24e..8c1e98bb 100644 --- a/libs/arcade-mcp-server/pyproject.toml +++ b/libs/arcade-mcp-server/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "pydantic>=2.0.0", "fastapi>=0.100.0", "uvicorn>=0.30.0", + "watchfiles>=0.18.0", # included with uvicorn, but listed to be explicit "sse-starlette>=2.0.0", "starlette>=0.37.0", "anyio>=4.0.0", diff --git a/libs/tests/arcade_mcp_server/test_mcp_app.py b/libs/tests/arcade_mcp_server/test_mcp_app.py index a88980dc..7d717bb0 100644 --- a/libs/tests/arcade_mcp_server/test_mcp_app.py +++ b/libs/tests/arcade_mcp_server/test_mcp_app.py @@ -1,6 +1,9 @@ """Tests for MCPApp initialization and basic functionality.""" +import subprocess +import sys from typing import Annotated +from unittest.mock import MagicMock, Mock, patch import pytest from arcade_core.catalog import MaterializedTool @@ -15,7 +18,15 @@ class TestMCPApp: @pytest.fixture def mcp_app(self) -> MCPApp: """Create an MCP app.""" - return MCPApp(name="TestMCPApp", version="1.0.0") + app = MCPApp(name="TestMCPApp", version="1.0.0") + + # Add a sample tool so the app doesn't exit when run() is called + @app.tool + def sample_tool(message: Annotated[str, "A message"]) -> str: + """A sample tool for testing.""" + return f"Response: {message}" + + return app def test_mcp_app_initialization(self): """Test MCPApp initialization creates proper settings.""" @@ -89,6 +100,8 @@ class TestMCPApp: def test_tool(self, mcp_app: MCPApp): """Test the MCPApp tool decorator.""" + initial_tool_count = len(mcp_app._catalog) + # Test decorator without parameters @mcp_app.tool def simple_tool(message: Annotated[str, "A message"]) -> str: @@ -102,7 +115,7 @@ class TestMCPApp: return f"Response: {message}" # Verify both tools were added - assert len(mcp_app._catalog) == 2 + assert len(mcp_app._catalog) == initial_tool_count + 2 # Verify decorator attributes assert hasattr(simple_tool, "__tool_name__") @@ -311,3 +324,196 @@ class TestMCPApp: monkeypatch.delenv("ARCADE_SERVER_HOST") monkeypatch.delenv("ARCADE_SERVER_PORT") monkeypatch.delenv("ARCADE_SERVER_TRANSPORT") + + def test_create_and_run_server(self, mcp_app: MCPApp): + """Test _create_and_run_server method with mocked dependencies.""" + with patch("arcade_mcp_server.mcp_app.create_arcade_mcp") as mock_create, patch( + "arcade_mcp_server.mcp_app.uvicorn" + ) as mock_uvicorn: + mock_fastapi_app = Mock() + mock_create.return_value = mock_fastapi_app + + # Test with INFO log level + mcp_app.log_level = "INFO" + mcp_app._create_and_run_server("127.0.0.1", 8000) + + mock_create.assert_called_once_with( + catalog=mcp_app._catalog, + mcp_settings=mcp_app._mcp_settings, + debug=False, + ) + mock_uvicorn.run.assert_called_once_with( + mock_fastapi_app, + host="127.0.0.1", + port=8000, + log_level="info", + reload=False, + lifespan="on", + ) + + # Test with DEBUG log level + with patch("arcade_mcp_server.mcp_app.create_arcade_mcp") as mock_create, patch( + "arcade_mcp_server.mcp_app.uvicorn" + ) as mock_uvicorn: + mock_fastapi_app = Mock() + mock_create.return_value = mock_fastapi_app + + mcp_app.log_level = "DEBUG" + mcp_app._create_and_run_server("192.168.1.1", 9000) + + mock_create.assert_called_once_with( + catalog=mcp_app._catalog, + mcp_settings=mcp_app._mcp_settings, + debug=True, + ) + mock_uvicorn.run.assert_called_once_with( + mock_fastapi_app, + host="192.168.1.1", + port=9000, + log_level="debug", + reload=False, + lifespan="on", + ) + + def test_run_with_reload_spawns_child_process(self, mcp_app: MCPApp): + """Test _run_with_reload spawns child process with correct environment.""" + mock_process = Mock() + mock_process.terminate = Mock() + mock_process.wait = Mock() + + with patch("arcade_mcp_server.mcp_app.subprocess.Popen") as mock_popen, patch( + "arcade_mcp_server.mcp_app.watch" + ) as mock_watch: + mock_popen.return_value = mock_process + # Return empty iterator to exit immediately + mock_watch.return_value = iter([]) + + mcp_app._run_with_reload("127.0.0.1", 8000) + + # Verify Popen was called with correct args + mock_popen.assert_called_once() + call_args = mock_popen.call_args + assert call_args[0][0] == [sys.executable, *sys.argv] + assert call_args[1]["env"]["ARCADE_MCP_CHILD_PROCESS"] == "1" + + def test_run_with_reload_restarts_on_changes(self, mcp_app: MCPApp): + """Test _run_with_reload restarts server when file changes detected.""" + mock_process1 = Mock() + mock_process2 = Mock() + + with patch("arcade_mcp_server.mcp_app.subprocess.Popen") as mock_popen, patch( + "arcade_mcp_server.mcp_app.watch" + ) as mock_watch: + mock_popen.side_effect = [mock_process1, mock_process2] + # Yield one set of changes then stop + mock_watch.return_value = iter([{("change", "test.py")}]) + + mcp_app._run_with_reload("127.0.0.1", 8000) + + # Verify both processes were created + assert mock_popen.call_count == 2 + + # Verify first process was terminated + mock_process1.terminate.assert_called_once() + mock_process1.wait.assert_called() + + def test_run_with_reload_graceful_shutdown(self, mcp_app: MCPApp): + """Test _run_with_reload gracefully shuts down process.""" + mock_process = Mock() + mock_process.wait = Mock() # Succeeds without timeout + + with patch("arcade_mcp_server.mcp_app.subprocess.Popen") as mock_popen, patch( + "arcade_mcp_server.mcp_app.watch" + ) as mock_watch: + mock_popen.return_value = mock_process + mock_watch.return_value = iter([{("change", "test.py")}]) + + mcp_app._run_with_reload("127.0.0.1", 8000) + + # Verify graceful shutdown + mock_process.terminate.assert_called() + mock_process.wait.assert_called() + mock_process.kill.assert_not_called() + + def test_run_with_reload_force_kill_on_timeout(self, mcp_app: MCPApp): + """Test _run_with_reload force kills process on timeout.""" + mock_process = Mock() + # First wait times out, second succeeds + mock_process.wait = Mock( + side_effect=[subprocess.TimeoutExpired("cmd", 5), None] + ) + + with patch("arcade_mcp_server.mcp_app.subprocess.Popen") as mock_popen, patch( + "arcade_mcp_server.mcp_app.watch" + ) as mock_watch: + mock_popen.return_value = mock_process + mock_watch.return_value = iter([{("change", "test.py")}]) + + mcp_app._run_with_reload("127.0.0.1", 8000) + + # Verify terminate -> wait -> kill -> wait sequence + mock_process.terminate.assert_called() + assert mock_process.wait.call_count == 2 + mock_process.kill.assert_called_once() + + def test_run_with_reload_keyboard_interrupt(self, mcp_app: MCPApp): + """Test _run_with_reload handles KeyboardInterrupt gracefully.""" + mock_process = Mock() + + with patch("arcade_mcp_server.mcp_app.subprocess.Popen") as mock_popen, patch( + "arcade_mcp_server.mcp_app.watch" + ) as mock_watch: + mock_popen.return_value = mock_process + mock_watch.side_effect = KeyboardInterrupt() + + # Should not raise exception + mcp_app._run_with_reload("127.0.0.1", 8000) + + # Verify process was shut down + mock_process.terminate.assert_called_once() + + def test_run_routes_to_reload_method(self, mcp_app: MCPApp): + """Test run() routes to _run_with_reload when reload=True.""" + with patch.object(mcp_app, "_run_with_reload") as mock_reload, patch.object( + mcp_app, "_create_and_run_server" + ) as mock_direct: + mcp_app.run(reload=True, transport="http", host="127.0.0.1", port=8000) + + mock_reload.assert_called_once_with("127.0.0.1", 8000) + mock_direct.assert_not_called() + + def test_run_routes_to_direct_method(self, mcp_app: MCPApp): + """Test run() routes to _create_and_run_server when reload=False.""" + with patch.object(mcp_app, "_run_with_reload") as mock_reload, patch.object( + mcp_app, "_create_and_run_server" + ) as mock_direct: + mcp_app.run(reload=False, transport="http", host="127.0.0.1", port=8000) + + mock_direct.assert_called_once_with("127.0.0.1", 8000) + mock_reload.assert_not_called() + + def test_run_child_process_disables_reload(self, mcp_app: MCPApp, monkeypatch): + """Test run() disables reload when ARCADE_MCP_CHILD_PROCESS is set.""" + monkeypatch.setenv("ARCADE_MCP_CHILD_PROCESS", "1") + + with patch.object(mcp_app, "_run_with_reload") as mock_reload, patch.object( + mcp_app, "_create_and_run_server" + ) as mock_direct: + mcp_app.run(reload=True, transport="http", host="127.0.0.1", port=8000) + + # Should route to direct method even though reload=True + mock_direct.assert_called_once_with("127.0.0.1", 8000) + mock_reload.assert_not_called() + + def test_run_stdio_unaffected_by_reload(self, mcp_app: MCPApp): + """Test run() with stdio transport is unaffected by reload flag.""" + with patch("arcade_mcp_server.__main__.run_stdio_server") as mock_stdio: + # Test with reload=True + mcp_app.run(reload=True, transport="stdio") + mock_stdio.assert_called_once() + + mock_stdio.reset_mock() + + # Test with reload=False + mcp_app.run(reload=False, transport="stdio") + mock_stdio.assert_called_once()