arcade-mcp/libs/arcade-serve/arcade_serve/mcp/message_processor.py
Sam Partee b6b4cd0a4c
🏗️ Restructure: Multi-Package Architecture + uv Migration (#412)
### Overview
Major restructuring from monolithic `arcade-ai` package to modular
library architecture with standardized uv-based dependency management.

![arcade-ai Monorepo
(2)](https://github.com/user-attachments/assets/25f102b0-bb87-4a04-9701-d227d05664b1)

### New Package Structure
- **`arcade-tdk`** - Lightweight toolkit development kit (core
decorators, auth)
- **`arcade-core`** - Core execution engine and catalog functionality  
- **`arcade-serve`** - FastAPI/MCP server components
- **`arcade-ai`** - Meta package that includes CLI functionality.
Optionally include evals via the `evals` extra. Optionally include all
packages via the `all` extra.

### Key Benefits
- **Lighter Dependencies**: Toolkits now depend only on `arcade-tdk` (~2
deps) vs full `arcade-ai` (~30+ deps)
- **Faster Builds**: uv provides 10-100x faster dependency resolution
and installation
- **Better Modularity**: Clear separation of concerns, consumers import
only what they need
- **Standard Tooling**: Eliminates custom poetry scripts, uses standard
Python packaging

### Migration Impact
- All 20 toolkits converted from poetry → uv with `arcade-tdk`
dependencies plus `arcade-ai[evals]` and `arcade-serve` dev
dependencies. When developing locally, devs should install toolkits via
`make install-local`.
- Modern Python 3.10+ type hints throughout
- Standardized build system with hatchling backend
- Enhanced Makefile with robust toolkit management commands
- Removed `arcade dev` CLI command
- Reduce the number of files created by `arcade new` and add an option
to not generate a tests and evals folder.

This foundation enables faster development cycles and cleaner dependency
chains for the growing toolkit ecosystem.

### Todo After this PR is merged
- [ ] Post-merge workflow(s) (release & publish containers, etc)
- [ ] Release order plan. @EricGustin suggests releasing in the
following order:
    1. `arcade-core` version 0.1.0
    2. `arcade-serve` version 0.1.0 and `arcade-tdk` version 0.1.0
    3. `arcade-ai` version 2.0.0
4. Patch release for all toolkits (all changes in toolkits are internal
refactors)
- [ ] [Update docs](https://github.com/ArcadeAI/docs/pull/318)

---------

Co-authored-by: Eric Gustin <eric@arcade.dev>
Co-authored-by: Eric Gustin <34000337+EricGustin@users.noreply.github.com>
2025-06-11 16:48:17 -07:00

83 lines
3.1 KiB
Python

import inspect
import json
import logging
from typing import Any, Callable, TypeVar
from arcade_serve.mcp.types import InitializeRequest, JSONRPCRequest, MCPMessage
logger = logging.getLogger("arcade.mcp")
T = TypeVar("T")
# Type definition for middleware functions
MessageProcessor = Callable[[Any, str], Any]
class MCPMessageProcessor:
"""
Processes MCP messages through a chain of middleware.
Supports both synchronous and asynchronous middleware.
"""
def __init__(self) -> None:
self.middleware: list[Callable[[MCPMessage, str], Any]] = []
def add_middleware(self, mw: Callable[[MCPMessage, str], Any]) -> None:
self.middleware.append(mw)
async def process(self, message: Any, direction: str) -> Any: # noqa: C901
# First, try to parse the message if it's a string
if isinstance(message, str):
# Strip any whitespace including newlines
message = message.strip()
if not message:
return None
try:
parsed = json.loads(message)
if isinstance(parsed, dict):
method = parsed.get("method")
# Convert to appropriate message type
if method == "initialize" and "id" in parsed:
logger.debug(f"Parsed initialize request: {parsed}")
message = InitializeRequest(**parsed)
elif method and method.startswith("notifications/"):
# It's a notification, log it but pass through as dict
logger.debug(f"Received notification: {method}")
# Keep as parsed dict to avoid validation errors on unknown notifications
message = parsed
elif "method" in parsed and "id" in parsed:
# Regular method request
logger.debug(f"Parsed method request: {method}")
message = JSONRPCRequest(**parsed)
# Other message types can be handled similarly
except json.JSONDecodeError:
logger.warning(f"Failed to parse message as JSON: {message[:100]}...")
except Exception:
logger.exception("Error processing message")
# Process through middleware chain
result = message
for mw in self.middleware:
try:
if inspect.iscoroutinefunction(mw):
result = await mw(result, direction)
else:
result = mw(result, direction)
except Exception:
logger.exception(f"Error in middleware {mw}")
return result
async def process_request(self, message: Any) -> Any:
return await self.process(message, "request")
async def process_response(self, message: Any) -> Any:
return await self.process(message, "response")
def create_message_processor(*middleware: MessageProcessor) -> MCPMessageProcessor:
processor = MCPMessageProcessor()
for m in middleware:
if m is not None:
processor.add_middleware(m)
return processor