arcade-mcp/libs/tests/arcade_mcp_server/test_resource_server_auth.py
Eric Gustin 28c1863ee3
Support Ed25519 Algorithm (#742)
Ed25519 is needed for Arcade AS. This required migrating from
`python-jose` to `joserfc`, because `python-jose` didn't seem to support
Ed25519
2026-01-16 15:55:05 -08:00

1455 lines
56 KiB
Python

import base64
import time
from unittest.mock import Mock, patch
import pytest
from arcade_core.catalog import ToolCatalog
from arcade_mcp_server.resource_server import (
AccessTokenValidationOptions,
AuthorizationServerEntry,
JWKSTokenValidator,
ResourceServerAuth,
)
from arcade_mcp_server.resource_server.base import (
InvalidTokenError,
ResourceOwner,
TokenExpiredError,
)
from arcade_mcp_server.resource_server.middleware import ResourceServerMiddleware
from arcade_mcp_server.worker import create_arcade_mcp
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from joserfc import jwt
from joserfc.jwk import OKPKey, RSAKey
# Test fixtures
@pytest.fixture(autouse=True)
def clean_auth_env(monkeypatch):
"""Clean server auth environment variables before each test."""
env_vars = [
"MCP_RESOURCE_SERVER_CANONICAL_URL",
"MCP_RESOURCE_SERVER_AUTHORIZATION_SERVERS",
]
for var in env_vars:
monkeypatch.delenv(var, raising=False)
yield
@pytest.fixture
def rsa_keypair():
"""Generate RSA key pair for testing."""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
public_key = private_key.public_key()
return private_key, public_key
@pytest.fixture
def rsa_joserfc_key(rsa_keypair):
"""Generate joserfc RSAKey from keypair."""
private_key, _ = rsa_keypair
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
return RSAKey.import_key(pem)
@pytest.fixture
def serialized_private_key(rsa_keypair):
"""Generate private key as PEM format for testing."""
private_key, _ = rsa_keypair
# Serialize private key to PEM format
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
return pem
@pytest.fixture
def jwks_data(rsa_keypair):
"""Generate JWKS data for testing."""
_, public_key = rsa_keypair
# Export public key in JWK format
public_numbers = public_key.public_numbers()
n = public_numbers.n
e = public_numbers.e
# Convert to base64url
n_bytes = n.to_bytes((n.bit_length() + 7) // 8, byteorder="big")
e_bytes = e.to_bytes((e.bit_length() + 7) // 8, byteorder="big")
n_b64 = base64.urlsafe_b64encode(n_bytes).decode("utf-8").rstrip("=")
e_b64 = base64.urlsafe_b64encode(e_bytes).decode("utf-8").rstrip("=")
return {
"keys": [
{
"kty": "RSA",
"kid": "test-key-1",
"use": "sig",
"alg": "RS256",
"n": n_b64,
"e": e_b64,
}
]
}
@pytest.fixture
def valid_jwt_token(rsa_joserfc_key):
"""Generate valid JWT token for testing."""
claims = {
"sub": "user123",
"email": "user@example.com",
"iss": "https://auth.example.com",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
return token
@pytest.fixture
def expired_jwt_token(rsa_joserfc_key):
"""Generate expired JWT token for testing."""
claims = {
"sub": "user123",
"email": "user@example.com",
"iss": "https://auth.example.com",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) - 3600,
"iat": int(time.time()) - 7200,
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
return token
# Ed25519 fixtures
@pytest.fixture
def ed25519_keypair():
"""Generate Ed25519 key pair for testing."""
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
return private_key, public_key
@pytest.fixture
def ed25519_joserfc_key(ed25519_keypair):
"""Generate joserfc OKPKey from Ed25519 keypair."""
private_key, _ = ed25519_keypair
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
return OKPKey.import_key(pem)
@pytest.fixture
def ed25519_jwks_data(ed25519_keypair):
"""Generate Ed25519 JWKS data for testing."""
_, public_key = ed25519_keypair
# Get the raw public key bytes
public_bytes = public_key.public_bytes_raw()
# Base64url encode the public key
x_b64 = base64.urlsafe_b64encode(public_bytes).decode("utf-8").rstrip("=")
return {
"keys": [
{
"kty": "OKP",
"kid": "ed25519-key-1",
"use": "sig",
"alg": "Ed25519",
"crv": "Ed25519",
"x": x_b64,
}
]
}
@pytest.fixture
def valid_ed25519_token(ed25519_joserfc_key):
"""Generate valid Ed25519 JWT token for testing."""
claims = {
"sub": "user456",
"email": "ed25519user@example.com",
"iss": "https://cloud.arcade.dev/oauth2",
"aud": "urn:arcade:mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "Ed25519", "kid": "ed25519-key-1"}
# Ed25519 is not in joserfc's recommended algorithms, so we must explicitly allow it
token = jwt.encode(header, claims, ed25519_joserfc_key, algorithms=["Ed25519"])
return token
@pytest.fixture
def expired_ed25519_token(ed25519_joserfc_key):
"""Generate expired Ed25519 JWT token for testing."""
claims = {
"sub": "user456",
"email": "ed25519user@example.com",
"iss": "https://cloud.arcade.dev/oauth2",
"aud": "urn:arcade:mcp",
"exp": int(time.time()) - 3600,
"iat": int(time.time()) - 7200,
}
header = {"alg": "Ed25519", "kid": "ed25519-key-1"}
# Ed25519 is not in joserfc's recommended algorithms, so we must explicitly allow it
token = jwt.encode(header, claims, ed25519_joserfc_key, algorithms=["Ed25519"])
return token
class TestJWKSTokenValidator:
"""Tests for JWKSTokenValidator class."""
@pytest.mark.asyncio
async def test_validate_valid_token(self, valid_jwt_token, jwks_data):
"""Test validating a valid JWT token."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
user = await validator.validate_token(valid_jwt_token)
assert isinstance(user, ResourceOwner)
assert user.user_id == "user123"
assert user.email == "user@example.com"
assert user.claims["iss"] == "https://auth.example.com"
@pytest.mark.asyncio
async def test_validate_expired_token(self, expired_jwt_token, jwks_data):
"""Test validating an expired JWT token."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
with pytest.raises(TokenExpiredError):
await validator.validate_token(expired_jwt_token)
@pytest.mark.asyncio
async def test_validate_wrong_audience(self, rsa_joserfc_key, jwks_data):
"""Test validating token with wrong audience."""
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "https://wrong-server.com", # Wrong audience
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
with pytest.raises(InvalidTokenError, match="audience"):
await validator.validate_token(token)
@pytest.mark.asyncio
async def test_validate_wrong_issuer(self, rsa_joserfc_key, jwks_data):
"""Test validating token with wrong issuer."""
claims = {
"sub": "user123",
"iss": "https://wrong-issuer.com", # Wrong issuer
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
with pytest.raises(InvalidTokenError, match="issuer"):
await validator.validate_token(token)
@pytest.mark.asyncio
async def test_validate_missing_sub_claim(self, rsa_joserfc_key, jwks_data):
"""Test validating token without sub claim."""
claims = {
"iss": "https://auth.example.com",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
with pytest.raises(InvalidTokenError, match="sub"):
await validator.validate_token(token)
@pytest.mark.asyncio
async def test_jwks_caching(self, valid_jwt_token, jwks_data):
"""Test that JWKS is cached to avoid repeated fetches."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
cache_ttl=3600,
)
# First validation should fetch JWKS
await validator.validate_token(valid_jwt_token)
assert mock_get.call_count == 1
# Second validation should use cached JWKS
await validator.validate_token(valid_jwt_token)
assert mock_get.call_count == 1
@pytest.mark.asyncio
async def test_validate_multiple_audiences_single_token_aud(self, rsa_joserfc_key, jwks_data):
"""Test validator with multiple audiences accepts token with matching single aud."""
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "https://old-mcp.example.com", # Matches first audience
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience=["https://old-mcp.example.com", "https://new-mcp.example.com"],
)
user = await validator.validate_token(token)
assert user.user_id == "user123"
@pytest.mark.asyncio
async def test_validate_multiple_audiences_list_token_aud(self, rsa_joserfc_key, jwks_data):
"""Test validator with multiple audiences accepts token with list aud."""
# Token with list of audiences where one matches the validator's accepted audiences
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": ["https://api1.com", "https://new-mcp.example.com"], # Second matches
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience=["https://old-mcp.example.com", "https://new-mcp.example.com"],
)
user = await validator.validate_token(token)
assert user.user_id == "user123"
@pytest.mark.asyncio
async def test_validate_multiple_audiences_no_match(self, rsa_joserfc_key, jwks_data):
"""Test validator with multiple audiences rejects token with non-matching aud."""
# Token with audience that doesn't match any of validator's accepted audiences
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "https://different-server.com", # Doesn't match
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience=["https://old-mcp.example.com", "https://new-mcp.example.com"],
)
with pytest.raises(InvalidTokenError, match="audience"):
await validator.validate_token(token)
@pytest.mark.asyncio
async def test_validate_single_audience_with_list_token_aud(self, rsa_joserfc_key, jwks_data):
"""Test validator with single audience accepts token with list aud containing match."""
# Token with list of audiences where one matches validator's single audience
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": ["https://api1.com", "https://mcp.example.com/mcp", "https://api2.com"],
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp", # Single audience
)
user = await validator.validate_token(token)
assert user.user_id == "user123"
@pytest.mark.asyncio
async def test_validate_multiple_issuers_efficient(self, rsa_joserfc_key, jwks_data):
"""Test that multi-issuer validation is efficient (single decode)."""
# Token from second issuer in list
claims = {
"sub": "user123",
"iss": "https://auth2.example.com", # Second in list
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
with patch(
"arcade_mcp_server.resource_server.validators.jwks.jwt.decode",
wraps=jwt.decode,
) as mock_decode:
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer=[
"https://auth1.example.com",
"https://auth2.example.com",
"https://auth3.example.com",
],
audience="https://mcp.example.com/mcp",
)
user = await validator.validate_token(token)
assert user.user_id == "user123"
# Should only need to decode once, not 3 times
assert mock_decode.call_count == 1
@pytest.mark.asyncio
async def test_validate_nbf_claim_before_time(self, rsa_joserfc_key, jwks_data):
"""Test that token with nbf claim in the future is rejected."""
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 7200, # expires in 2 hours
"iat": int(time.time()),
"nbf": int(time.time()) + 3600, # Not valid for 1 hour
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
validation_options=AccessTokenValidationOptions(verify_nbf=True),
)
with pytest.raises(InvalidTokenError):
await validator.validate_token(token)
@pytest.mark.asyncio
async def test_validate_nbf_claim_disabled(self, rsa_joserfc_key, jwks_data):
"""Test that token with nbf in future is accepted when verify_nbf=False."""
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 7200, # expires in 2 hours
"iat": int(time.time()),
"nbf": int(time.time()) + 3600, # Not valid for 1 hour
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
validation_options=AccessTokenValidationOptions(verify_nbf=False),
)
# Should accept the token when nbf verification is disabled
user = await validator.validate_token(token)
assert user.user_id == "user123"
@pytest.mark.asyncio
async def test_validate_with_leeway(self, rsa_joserfc_key, jwks_data):
"""Test that leeway allows slightly expired tokens."""
# Token expired 30 seconds ago
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) - 30,
"iat": int(time.time()) - 3600,
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
# Validator with 60 second leeway should accept this token
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
validation_options=AccessTokenValidationOptions(leeway=60),
)
user = await validator.validate_token(token)
assert user.user_id == "user123"
class TestJWKSTokenValidatorEd25519:
"""Tests for JWKSTokenValidator with Ed25519 algorithm."""
@pytest.mark.asyncio
async def test_validate_valid_ed25519_token(self, valid_ed25519_token, ed25519_jwks_data):
"""Test validating a valid Ed25519 JWT token."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = ed25519_jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://cloud.arcade.dev/.well-known/jwks/oauth2",
issuer="https://cloud.arcade.dev/oauth2",
audience="urn:arcade:mcp",
algorithm="Ed25519",
)
user = await validator.validate_token(valid_ed25519_token)
assert isinstance(user, ResourceOwner)
assert user.user_id == "user456"
assert user.email == "ed25519user@example.com"
assert user.claims["iss"] == "https://cloud.arcade.dev/oauth2"
@pytest.mark.asyncio
async def test_validate_expired_ed25519_token(self, expired_ed25519_token, ed25519_jwks_data):
"""Test validating an expired Ed25519 JWT token."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = ed25519_jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://cloud.arcade.dev/.well-known/jwks/oauth2",
issuer="https://cloud.arcade.dev/oauth2",
audience="urn:arcade:mcp",
algorithm="Ed25519",
)
with pytest.raises(TokenExpiredError):
await validator.validate_token(expired_ed25519_token)
@pytest.mark.asyncio
async def test_ed25519_algorithm_mismatch(self, valid_jwt_token, ed25519_jwks_data):
"""Test that RS256 token is rejected when validator expects Ed25519."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = ed25519_jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://cloud.arcade.dev/.well-known/jwks/oauth2",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
algorithm="Ed25519",
)
# RS256 token should be rejected when Ed25519 is expected
with pytest.raises(InvalidTokenError, match="algorithm"):
await validator.validate_token(valid_jwt_token)
@pytest.mark.asyncio
async def test_ed25519_wrong_audience(self, ed25519_joserfc_key, ed25519_jwks_data):
"""Test Ed25519 token with wrong audience is rejected."""
claims = {
"sub": "user456",
"iss": "https://cloud.arcade.dev/oauth2",
"aud": "wrong:audience",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "Ed25519", "kid": "ed25519-key-1"}
token = jwt.encode(header, claims, ed25519_joserfc_key, algorithms=["Ed25519"])
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = ed25519_jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://cloud.arcade.dev/.well-known/jwks/oauth2",
issuer="https://cloud.arcade.dev/oauth2",
audience="urn:arcade:mcp",
algorithm="Ed25519",
)
with pytest.raises(InvalidTokenError, match="audience"):
await validator.validate_token(token)
@pytest.mark.asyncio
async def test_eddsa_algorithm_alias(self, ed25519_joserfc_key, ed25519_jwks_data):
"""Test that EdDSA algorithm alias works for Ed25519."""
claims = {
"sub": "user456",
"email": "ed25519user@example.com",
"iss": "https://cloud.arcade.dev/oauth2",
"aud": "urn:arcade:mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "Ed25519", "kid": "ed25519-key-1"}
token = jwt.encode(header, claims, ed25519_joserfc_key, algorithms=["Ed25519"])
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = ed25519_jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
# Use EdDSA alias
validator = JWKSTokenValidator(
jwks_uri="https://cloud.arcade.dev/.well-known/jwks/oauth2",
issuer="https://cloud.arcade.dev/oauth2",
audience="urn:arcade:mcp",
algorithm="EdDSA", # Using EdDSA alias
)
user = await validator.validate_token(token)
assert user.user_id == "user456"
def test_ed25519_supported_algorithm(self):
"""Test that Ed25519 is in supported algorithms."""
# Should not raise
validator = JWKSTokenValidator(
jwks_uri="https://example.com/jwks",
issuer="https://example.com",
audience="https://example.com",
algorithm="Ed25519",
)
assert validator.algorithm == "Ed25519"
def test_eddsa_supported_algorithm(self):
"""Test that EdDSA is in supported algorithms."""
# Should not raise
validator = JWKSTokenValidator(
jwks_uri="https://example.com/jwks",
issuer="https://example.com",
audience="https://example.com",
algorithm="EdDSA",
)
assert validator.algorithm == "EdDSA"
class TestArcadeASConfiguration:
"""Tests for Arcade AS configuration."""
@pytest.mark.asyncio
async def test_arcade_as_config(self, valid_ed25519_token, ed25519_jwks_data):
"""Test configuration matching Arcade AS."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = ed25519_jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
# Configuration matching Arcade AS
resource_server_auth = ResourceServerAuth(
canonical_url="https://gateway-manager.arcade.dev/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://cloud.arcade.dev/oauth2",
issuer="https://cloud.arcade.dev/oauth2",
jwks_uri="https://cloud.arcade.dev/.well-known/jwks/oauth2",
algorithm="Ed25519",
expected_audiences=[
"urn:arcade:mcp",
"https://gateway-manager.arcade.dev/mcp",
],
)
],
)
user = await resource_server_auth.validate_token(valid_ed25519_token)
assert user.user_id == "user456"
assert user.email == "ed25519user@example.com"
def test_arcade_as_metadata(self):
"""Test OAuth metadata for Arcade AS configuration."""
resource_server_auth = ResourceServerAuth(
canonical_url="https://gateway-manager.arcade.dev/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://cloud.arcade.dev/oauth2",
issuer="https://cloud.arcade.dev/oauth2",
jwks_uri="https://cloud.arcade.dev/.well-known/jwks/oauth2",
algorithm="Ed25519",
expected_audiences=["urn:arcade:mcp"],
)
],
)
metadata = resource_server_auth.get_resource_metadata()
assert metadata["resource"] == "https://gateway-manager.arcade.dev/mcp"
assert metadata["authorization_servers"] == ["https://cloud.arcade.dev/oauth2"]
# ResourceServerAuth Tests
class TestResourceServerAuth:
"""Tests for ResourceServerAuth class."""
def test_supports_oauth_discovery(self):
"""Test that ResourceServerAuth supports OAuth discovery."""
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/.well-known/jwks.json",
)
],
)
assert resource_server_auth.supports_oauth_discovery() is True
def test_get_resource_metadata(self):
"""Test getting OAuth Protected Resource Metadata."""
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/.well-known/jwks.json",
)
],
)
metadata = resource_server_auth.get_resource_metadata()
assert metadata["resource"] == "https://mcp.example.com/mcp"
assert metadata["authorization_servers"] == ["https://auth.example.com"]
assert metadata["bearer_methods_supported"] == ["header"]
@pytest.mark.asyncio
async def test_expected_audiences_override(self, rsa_keypair, jwks_data, rsa_joserfc_key):
"""Test that expected_audiences overrides canonical_url for audience validation."""
# Token with custom audience
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "my-authkit-client-id",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/.well-known/jwks.json",
expected_audiences=["my-authkit-client-id"],
)
],
)
user = await resource_server_auth.validate_token(token)
assert user.user_id == "user123"
@pytest.mark.asyncio
async def test_expected_audiences_multiple_values(
self, rsa_keypair, jwks_data, rsa_joserfc_key
):
"""Test that multiple expected_audiences work correctly."""
# Token with one of the expected audiences
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "secondary-client-id",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/.well-known/jwks.json",
expected_audiences=[
"primary-client-id",
"secondary-client-id",
"tertiary-client-id",
],
)
],
)
user = await resource_server_auth.validate_token(token)
assert user.user_id == "user123"
@pytest.mark.asyncio
async def test_expected_audiences_defaults_to_canonical_url(
self, rsa_keypair, jwks_data, rsa_joserfc_key
):
"""Test that without expected_audiences, canonical_url is used for audience validation."""
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/.well-known/jwks.json",
)
],
)
user = await resource_server_auth.validate_token(token)
assert user.user_id == "user123"
@pytest.mark.asyncio
async def test_expected_audiences_wrong_audience_rejected(
self, rsa_keypair, jwks_data, rsa_joserfc_key
):
"""Test that tokens with wrong audience are rejected even with expected_audiences."""
claims = {
"sub": "user123",
"iss": "https://auth.example.com",
"aud": "wrong-client-id", # Not in expected_audiences list
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/.well-known/jwks.json",
expected_audiences=["correct-client-id"],
)
],
)
with pytest.raises(InvalidTokenError):
await resource_server_auth.validate_token(token)
# ResourceServerMiddleware Tests
class TestResourceServerMiddleware:
"""Tests for ResourceServerMiddleware class."""
@pytest.mark.asyncio
async def test_authenticated_request(self, valid_jwt_token, jwks_data):
"""Test authenticated request passes through."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
# Mock app
app_called = False
async def mock_app(scope, receive, send):
nonlocal app_called
app_called = True
assert "resource_owner" in scope
assert scope["resource_owner"].user_id == "user123"
middleware = ResourceServerMiddleware(
mock_app,
validator,
"https://mcp.example.com/mcp",
)
# Create mock request
scope = {
"type": "http",
"method": "POST",
"headers": [(b"authorization", f"Bearer {valid_jwt_token}".encode())],
}
async def receive():
return {"type": "http.request", "body": b""}
async def send(message):
pass
await middleware(scope, receive, send)
assert app_called is True
@pytest.mark.asyncio
async def test_missing_authorization_header(self, jwks_data):
"""Test request without Authorization header returns 401."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/.well-known/jwks.json",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
async def mock_app(scope, receive, send):
pytest.fail("App should not be called")
middleware = ResourceServerMiddleware(
mock_app,
validator,
"https://mcp.example.com/mcp",
)
# mock request w/o auth header
scope = {
"type": "http",
"method": "POST",
"headers": [],
}
async def receive():
return {"type": "http.request", "body": b""}
response_sent = {}
async def send(message):
if message["type"] == "http.response.start":
response_sent["status"] = message["status"]
response_sent["headers"] = dict(message.get("headers", []))
await middleware(scope, receive, send)
assert response_sent["status"] == 401
assert any(k.lower() == b"www-authenticate" for k in response_sent["headers"])
@pytest.mark.asyncio
async def test_www_authenticate_header_format(self, jwks_data):
"""Test WWW-Authenticate header format compliance."""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/.well-known/jwks.json",
)
],
)
async def mock_app(scope, receive, send):
pytest.fail("App should not be called")
middleware = ResourceServerMiddleware(
mock_app,
resource_server,
"https://mcp.example.com/mcp",
)
scope = {
"type": "http",
"method": "POST",
"headers": [],
}
async def receive():
return {"type": "http.request", "body": b""}
response_headers = {}
async def send(message):
if message["type"] == "http.response.start":
response_headers.update(dict(message.get("headers", [])))
await middleware(scope, receive, send)
www_auth = response_headers.get(b"www-authenticate", b"").decode()
assert "Bearer" in www_auth
assert "resource_metadata=" in www_auth
assert "/.well-known/oauth-protected-resource" in www_auth
class TestEnvVarConfiguration:
"""Tests for front-door auth env var configuration support."""
@pytest.mark.asyncio
async def test_resource_server_param_precedence(self, monkeypatch):
"""Test that explicit parameters take precedence over environment variables."""
monkeypatch.setenv("MCP_RESOURCE_SERVER_CANONICAL_URL", "https://env-mcp.example.com")
monkeypatch.setenv(
"MCP_RESOURCE_SERVER_AUTHORIZATION_SERVERS",
'[{"authorization_server_url":"https://env.example.com","issuer":"https://env.example.com","jwks_uri":"https://env.example.com/jwks"}]',
)
resource_server = ResourceServerAuth(
canonical_url="https://param-mcp.example.com",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://param.example.com",
issuer="https://param.example.com",
jwks_uri="https://param.example.com/jwks",
)
],
)
# Explicit parameters should take precedence over env vars
assert resource_server.canonical_url == "https://param-mcp.example.com"
metadata = resource_server.get_resource_metadata()
assert metadata["authorization_servers"] == ["https://param.example.com"]
@pytest.mark.asyncio
async def test_resource_server_all_env_vars(self, monkeypatch):
"""Test ResourceServerAuth with all env vars, no parameters."""
monkeypatch.setenv("MCP_RESOURCE_SERVER_CANONICAL_URL", "https://mcp.example.com/mcp")
monkeypatch.setenv(
"MCP_RESOURCE_SERVER_AUTHORIZATION_SERVERS",
'[{"authorization_server_url":"https://auth.example.com","issuer":"https://auth.example.com","jwks_uri":"https://auth.example.com/jwks","algorithm":"RS256","expected_audiences":["custom-client-id"]}]',
)
resource_server_auth = ResourceServerAuth()
assert resource_server_auth.canonical_url == "https://mcp.example.com/mcp"
metadata = resource_server_auth.get_resource_metadata()
assert metadata["authorization_servers"] == ["https://auth.example.com"]
def test_resource_server_missing_required(self):
"""Test that missing required fields raise ValueError."""
with pytest.raises(ValueError, match="'canonical_url' required"):
ResourceServerAuth(
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/jwks",
)
],
# Missing canonical_url
)
@pytest.mark.asyncio
async def test_worker_no_canonical_url_for_jwks_validator(self):
"""Test that worker doesn't require canonical_url for JWKSTokenValidator."""
jwt_validator = JWKSTokenValidator(
jwks_uri="https://auth.example.com/jwks",
issuer="https://auth.example.com",
audience="https://mcp.example.com/mcp",
)
catalog = ToolCatalog()
# Shouldn't raise b/c JWKSTokenValidator doesn't support OAuth discovery
app = create_arcade_mcp(catalog, resource_server_validator=jwt_validator)
assert app is not None
def test_worker_requires_canonical_url_for_resource_server(self):
"""Test that ResourceServerAuth validation happens during init."""
with pytest.raises(ValueError, match="'canonical_url' required"):
ResourceServerAuth(
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/jwks",
)
],
# Missing canonical_url
)
class TestMultipleAuthorizationServers:
"""Tests for multiple authorization server support."""
@pytest.mark.asyncio
async def test_resource_server_multiple_as_shared_jwks(self, jwks_data, valid_jwt_token):
"""Test multiple AS URLs with same JWKS"""
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth-us.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/jwks",
),
AuthorizationServerEntry(
authorization_server_url="https://auth-eu.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/jwks",
),
],
)
# Verify that metadata returns all Auth Server URLs
metadata = resource_server_auth.get_resource_metadata()
assert metadata["resource"] == "https://mcp.example.com/mcp"
assert metadata["authorization_servers"] == [
"https://auth-us.example.com",
"https://auth-eu.example.com",
]
# Verify that token validation works
user = await resource_server_auth.validate_token(valid_jwt_token)
assert user.user_id == "user123"
assert user.email == "user@example.com"
@pytest.mark.asyncio
async def test_resource_server_multiple_as_different_jwks(
self, rsa_keypair, jwks_data, rsa_joserfc_key
):
"""Test multiple AS with different JWKS (multi-IdP)."""
claims1 = {
"sub": "user123",
"email": "user@workos.com",
"iss": "https://workos.authkit.app",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header1 = {"alg": "RS256", "kid": "test-key-1"}
token1 = jwt.encode(header1, claims1, rsa_joserfc_key)
claims2 = {
"sub": "user456",
"email": "user@keycloak.com",
"iss": "http://localhost:8080/realms/mcp-test",
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header2 = {"alg": "RS256", "kid": "test-key-1"}
token2 = jwt.encode(header2, claims2, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://workos.authkit.app",
issuer="https://workos.authkit.app",
jwks_uri="https://workos.authkit.app/oauth2/jwks",
),
AuthorizationServerEntry(
authorization_server_url="http://localhost:8080/realms/mcp-test",
issuer="http://localhost:8080/realms/mcp-test",
jwks_uri="http://localhost:8080/realms/mcp-test/protocol/openid-connect/certs",
algorithm="RS256",
),
],
)
# Verify metadata returns all Auth Server URLs
metadata = resource_server_auth.get_resource_metadata()
assert metadata["authorization_servers"] == [
"https://workos.authkit.app",
"http://localhost:8080/realms/mcp-test",
]
# Verify tokens from both Auth Servers work
user1 = await resource_server_auth.validate_token(token1)
assert user1.user_id == "user123"
assert user1.email == "user@workos.com"
user2 = await resource_server_auth.validate_token(token2)
assert user2.user_id == "user456"
assert user2.email == "user@keycloak.com"
@pytest.mark.asyncio
async def test_resource_server_rejects_unconfigured_as(
self, rsa_keypair, jwks_data, rsa_joserfc_key
):
"""Test that tokens from unlisted AS are rejected."""
claims = {
"sub": "user123",
"email": "user@evil.com",
"iss": "https://evil.com", # Not in configured list (unauthorized issuer)
"aud": "https://mcp.example.com/mcp",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
}
header = {"alg": "RS256", "kid": "test-key-1"}
token = jwt.encode(header, claims, rsa_joserfc_key)
with patch("httpx.AsyncClient.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = jwks_data
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth.example.com",
issuer="https://auth.example.com",
jwks_uri="https://auth.example.com/jwks",
)
],
)
# Should reject token from unauthorized Auth Server (issuer)
with pytest.raises(
InvalidTokenError,
match="Token validation failed for all configured authorization servers",
):
await resource_server_auth.validate_token(token)
def test_authorization_servers_env_var_parsing_json(self, monkeypatch):
"""Test parsing JSON array of AS configs from env var."""
monkeypatch.setenv("MCP_RESOURCE_SERVER_CANONICAL_URL", "https://mcp.example.com/mcp")
monkeypatch.setenv(
"MCP_RESOURCE_SERVER_AUTHORIZATION_SERVERS",
'[{"authorization_server_url": "https://auth1.com", "issuer": "https://auth1.com", "jwks_uri": "https://auth1.com/jwks"}]',
)
resource_server_auth = ResourceServerAuth()
metadata = resource_server_auth.get_resource_metadata()
assert metadata["authorization_servers"] == ["https://auth1.com"]
def test_resource_metadata_multiple_as(self):
"""Test that resource metadata returns all AS URLs."""
resource_server_auth = ResourceServerAuth(
canonical_url="https://mcp.example.com/mcp",
authorization_servers=[
AuthorizationServerEntry(
authorization_server_url="https://auth1.example.com",
issuer="https://auth1.example.com",
jwks_uri="https://auth1.example.com/jwks",
),
AuthorizationServerEntry(
authorization_server_url="https://auth2.example.com",
issuer="https://auth2.example.com",
jwks_uri="https://auth2.example.com/jwks",
),
AuthorizationServerEntry(
authorization_server_url="https://auth3.example.com",
issuer="https://auth3.example.com",
jwks_uri="https://auth3.example.com/jwks",
),
],
)
metadata = resource_server_auth.get_resource_metadata()
assert metadata["resource"] == "https://mcp.example.com/mcp"
assert len(metadata["authorization_servers"]) == 3
assert "https://auth1.example.com" in metadata["authorization_servers"]
assert "https://auth2.example.com" in metadata["authorization_servers"]
assert "https://auth3.example.com" in metadata["authorization_servers"]