feat: Streamable HTTP support (#643)

Co-authored-by: aagarwal25 <akshit_agarwal@intuit.com>
This commit is contained in:
Akshit97 2025-05-15 00:15:14 +05:30 committed by GitHub
parent 02b6e7013c
commit 1847008e0f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 247 additions and 14 deletions

View file

@ -0,0 +1,13 @@
# MCP Streamable HTTP Example
This example uses a local Streamable HTTP server in [server.py](server.py).
Run the example via:
```
uv run python examples/mcp/streamablehttp_example/main.py
```
## Details
The example uses the `MCPServerStreamableHttp` class from `agents.mcp`. The server runs in a sub-process at `https://localhost:8000/mcp`.

View file

@ -0,0 +1,83 @@
import asyncio
import os
import shutil
import subprocess
import time
from typing import Any
from agents import Agent, Runner, gen_trace_id, trace
from agents.mcp import MCPServer, MCPServerStreamableHttp
from agents.model_settings import ModelSettings
async def run(mcp_server: MCPServer):
agent = Agent(
name="Assistant",
instructions="Use the tools to answer the questions.",
mcp_servers=[mcp_server],
model_settings=ModelSettings(tool_choice="required"),
)
# Use the `add` tool to add two numbers
message = "Add these numbers: 7 and 22."
print(f"Running: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)
# Run the `get_weather` tool
message = "What's the weather in Tokyo?"
print(f"\n\nRunning: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)
# Run the `get_secret_word` tool
message = "What's the secret word?"
print(f"\n\nRunning: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)
async def main():
async with MCPServerStreamableHttp(
name="Streamable HTTP Python Server",
params={
"url": "http://localhost:8000/mcp",
},
) as server:
trace_id = gen_trace_id()
with trace(workflow_name="Streamable HTTP Example", trace_id=trace_id):
print(f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}\n")
await run(server)
if __name__ == "__main__":
# Let's make sure the user has uv installed
if not shutil.which("uv"):
raise RuntimeError(
"uv is not installed. Please install it: https://docs.astral.sh/uv/getting-started/installation/"
)
# We'll run the Streamable HTTP server in a subprocess. Usually this would be a remote server, but for this
# demo, we'll run it locally at http://localhost:8000/mcp
process: subprocess.Popen[Any] | None = None
try:
this_dir = os.path.dirname(os.path.abspath(__file__))
server_file = os.path.join(this_dir, "server.py")
print("Starting Streamable HTTP server at http://localhost:8000/mcp ...")
# Run `uv run server.py` to start the Streamable HTTP server
process = subprocess.Popen(["uv", "run", server_file])
# Give it 3 seconds to start
time.sleep(3)
print("Streamable HTTP server started. Running example...\n\n")
except Exception as e:
print(f"Error starting Streamable HTTP server: {e}")
exit(1)
try:
asyncio.run(main())
finally:
if process:
process.terminate()

View file

@ -0,0 +1,33 @@
import random
import requests
from mcp.server.fastmcp import FastMCP
# Create server
mcp = FastMCP("Echo Server")
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
print(f"[debug-server] add({a}, {b})")
return a + b
@mcp.tool()
def get_secret_word() -> str:
print("[debug-server] get_secret_word()")
return random.choice(["apple", "banana", "cherry"])
@mcp.tool()
def get_current_weather(city: str) -> str:
print(f"[debug-server] get_current_weather({city})")
endpoint = "https://wttr.in"
response = requests.get(f"{endpoint}/{city}")
return response.text
if __name__ == "__main__":
mcp.run(transport="streamable-http")

View file

@ -13,7 +13,7 @@ dependencies = [
"typing-extensions>=4.12.2, <5",
"requests>=2.0, <3",
"types-requests>=2.0, <3",
"mcp>=1.6.0, <2; python_version >= '3.10'",
"mcp>=1.8.0, <2; python_version >= '3.10'",
]
classifiers = [
"Typing :: Typed",

View file

@ -5,6 +5,8 @@ try:
MCPServerSseParams,
MCPServerStdio,
MCPServerStdioParams,
MCPServerStreamableHttp,
MCPServerStreamableHttpParams,
)
except ImportError:
pass
@ -17,5 +19,7 @@ __all__ = [
"MCPServerSseParams",
"MCPServerStdio",
"MCPServerStdioParams",
"MCPServerStreamableHttp",
"MCPServerStreamableHttpParams",
"MCPUtil",
]

View file

@ -10,7 +10,9 @@ from typing import Any, Literal
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp import ClientSession, StdioServerParameters, Tool as MCPTool, stdio_client
from mcp.client.sse import sse_client
from mcp.types import CallToolResult, JSONRPCMessage
from mcp.client.streamable_http import GetSessionIdCallback, streamablehttp_client
from mcp.shared.message import SessionMessage
from mcp.types import CallToolResult
from typing_extensions import NotRequired, TypedDict
from ..exceptions import UserError
@ -83,8 +85,9 @@ class _MCPServerWithClientSession(MCPServer, abc.ABC):
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
GetSessionIdCallback | None
]
]:
"""Create the streams for the server."""
@ -105,7 +108,11 @@ class _MCPServerWithClientSession(MCPServer, abc.ABC):
"""Connect to the server."""
try:
transport = await self.exit_stack.enter_async_context(self.create_streams())
read, write = transport
# streamablehttp_client returns (read, write, get_session_id)
# sse_client returns (read, write)
read, write, *_ = transport
session = await self.exit_stack.enter_async_context(
ClientSession(
read,
@ -232,8 +239,9 @@ class MCPServerStdio(_MCPServerWithClientSession):
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
GetSessionIdCallback | None
]
]:
"""Create the streams for the server."""
@ -302,8 +310,9 @@ class MCPServerSse(_MCPServerWithClientSession):
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
MemoryObjectSendStream[JSONRPCMessage],
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
GetSessionIdCallback | None
]
]:
"""Create the streams for the server."""
@ -318,3 +327,84 @@ class MCPServerSse(_MCPServerWithClientSession):
def name(self) -> str:
"""A readable name for the server."""
return self._name
class MCPServerStreamableHttpParams(TypedDict):
"""Mirrors the params in`mcp.client.streamable_http.streamablehttp_client`."""
url: str
"""The URL of the server."""
headers: NotRequired[dict[str, str]]
"""The headers to send to the server."""
timeout: NotRequired[timedelta]
"""The timeout for the HTTP request. Defaults to 5 seconds."""
sse_read_timeout: NotRequired[timedelta]
"""The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""
terminate_on_close: NotRequired[bool]
"""Terminate on close"""
class MCPServerStreamableHttp(_MCPServerWithClientSession):
"""MCP server implementation that uses the Streamable HTTP transport. See the [spec]
(https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)
for details.
"""
def __init__(
self,
params: MCPServerStreamableHttpParams,
cache_tools_list: bool = False,
name: str | None = None,
client_session_timeout_seconds: float | None = 5,
):
"""Create a new MCP server based on the Streamable HTTP transport.
Args:
params: The params that configure the server. This includes the URL of the server,
the headers to send to the server, the timeout for the HTTP request, and the
timeout for the Streamable HTTP connection and whether we need to
terminate on close.
cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
cached and only fetched from the server once. If `False`, the tools list will be
fetched from the server on each call to `list_tools()`. The cache can be
invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
if you know the server will not change its tools list, because it can drastically
improve latency (by avoiding a round-trip to the server every time).
name: A readable name for the server. If not provided, we'll create one from the
URL.
client_session_timeout_seconds: the read timeout passed to the MCP ClientSession.
"""
super().__init__(cache_tools_list, client_session_timeout_seconds)
self.params = params
self._name = name or f"streamable_http: {self.params['url']}"
def create_streams(
self,
) -> AbstractAsyncContextManager[
tuple[
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
GetSessionIdCallback | None
]
]:
"""Create the streams for the server."""
return streamablehttp_client(
url=self.params["url"],
headers=self.params.get("headers", None),
timeout=self.params.get("timeout", timedelta(seconds=30)),
sse_read_timeout=self.params.get("sse_read_timeout", timedelta(seconds=60 * 5)),
terminate_on_close=self.params.get("terminate_on_close", True)
)
@property
def name(self) -> str:
"""A readable name for the server."""
return self._name

20
uv.lock
View file

@ -1047,7 +1047,7 @@ wheels = [
[[package]]
name = "mcp"
version = "1.6.0"
version = "1.8.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio", marker = "python_full_version >= '3.10'" },
@ -1055,13 +1055,14 @@ dependencies = [
{ name = "httpx-sse", marker = "python_full_version >= '3.10'" },
{ name = "pydantic", marker = "python_full_version >= '3.10'" },
{ name = "pydantic-settings", marker = "python_full_version >= '3.10'" },
{ name = "python-multipart", marker = "python_full_version >= '3.10'" },
{ name = "sse-starlette", marker = "python_full_version >= '3.10'" },
{ name = "starlette", marker = "python_full_version >= '3.10'" },
{ name = "uvicorn", marker = "python_full_version >= '3.10'" },
{ name = "uvicorn", marker = "python_full_version >= '3.10' and sys_platform != 'emscripten'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/95/d2/f587cb965a56e992634bebc8611c5b579af912b74e04eb9164bd49527d21/mcp-1.6.0.tar.gz", hash = "sha256:d9324876de2c5637369f43161cd71eebfd803df5a95e46225cab8d280e366723", size = 200031 }
sdist = { url = "https://files.pythonhosted.org/packages/7c/13/16b712e8a3be6a736b411df2fc6b4e75eb1d3e99b1cd57a3a1decf17f612/mcp-1.8.1.tar.gz", hash = "sha256:ec0646271d93749f784d2316fb5fe6102fb0d1be788ec70a9e2517e8f2722c0e", size = 265605 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/10/30/20a7f33b0b884a9d14dd3aa94ff1ac9da1479fe2ad66dd9e2736075d2506/mcp-1.6.0-py3-none-any.whl", hash = "sha256:7bd24c6ea042dbec44c754f100984d186620d8b841ec30f1b19eda9b93a634d0", size = 76077 },
{ url = "https://files.pythonhosted.org/packages/1c/5d/91cf0d40e40ae9ecf8d4004e0f9611eea86085aa0b5505493e0ff53972da/mcp-1.8.1-py3-none-any.whl", hash = "sha256:948e03783859fa35abe05b9b6c0a1d5519be452fc079dc8d7f682549591c1770", size = 119761 },
]
[[package]]
@ -1533,7 +1534,7 @@ requires-dist = [
{ name = "graphviz", marker = "extra == 'viz'", specifier = ">=0.17" },
{ name = "griffe", specifier = ">=1.5.6,<2" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = ">=1.67.4.post1,<2" },
{ name = "mcp", marker = "python_full_version >= '3.10'", specifier = ">=1.6.0,<2" },
{ name = "mcp", marker = "python_full_version >= '3.10'", specifier = ">=1.8.0,<2" },
{ name = "numpy", marker = "python_full_version >= '3.10' and extra == 'voice'", specifier = ">=2.2.0,<3" },
{ name = "openai", specifier = ">=1.76.0" },
{ name = "pydantic", specifier = ">=2.10,<3" },
@ -2085,6 +2086,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256 },
]
[[package]]
name = "python-multipart"
version = "0.0.20"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f3/87/f44d7c9f274c7ee665a29b885ec97089ec5dc034c7f3fafa03da9e39a09e/python_multipart-0.0.20.tar.gz", hash = "sha256:8dd0cab45b8e23064ae09147625994d090fa46f5b0d1e13af944c331a7fa9d13", size = 37158 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546 },
]
[[package]]
name = "python-xlib"
version = "0.33"