Pytests for Spotify tools (#204)

This commit is contained in:
Renato Byrro 2025-01-14 12:45:32 -08:00 committed by GitHub
parent 5022d339a2
commit 62327c30a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 721 additions and 20 deletions

View file

@ -12,3 +12,24 @@ ENDPOINTS = {
"tracks_get_track": "/tracks/{track_id}",
"search": "/search",
}
RESPONSE_MSGS = {
"artist_not_found": "Artist '{artist_name}' not found",
"track_not_found": "Track '{track_name}' not found",
"no_track_to_adjust_position": "No track to adjust position",
"playback_position_adjusted": "Playback position adjusted",
"no_track_to_go_back_to": "No track to go back to",
"playback_skipped_to_previous_track": "Playback skipped to previous track",
"no_track_to_skip": "No track to skip",
"playback_skipped_to_next_track": "Playback skipped to next track",
"playback_paused": "Playback paused",
"playback_resumed": "Playback resumed",
"no_track_to_resume": "No track to resume",
"no_track_to_pause": "No track to pause",
"no_track_to_play": "No track to play",
"no_available_devices": "No available devices",
"track_is_already_paused": "Track is already paused",
"track_is_already_playing": "Track is already playing",
"playback_started": "Playback started",
"no_active_device": "Cannot start playback because no active device is available",
}

View file

@ -1,9 +1,11 @@
from typing import Annotated, Optional
import httpx
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Spotify
from arcade.sdk.errors import RetryableToolError
from arcade.sdk.errors import RetryableToolError, ToolExecutionError
from arcade_spotify.tools.constants import RESPONSE_MSGS
from arcade_spotify.tools.models import Device, SearchType
from arcade_spotify.tools.search import search
from arcade_spotify.tools.utils import (
@ -50,7 +52,7 @@ async def adjust_playback_position(
if relative_position_ms is not None:
playback_state = await get_playback_state(context)
if playback_state.get("device_id") is None:
return "No track to adjust position"
return RESPONSE_MSGS["no_track_to_adjust_position"]
absolute_position_ms = playback_state["progress_ms"] + relative_position_ms
@ -59,14 +61,17 @@ async def adjust_playback_position(
url = get_url("player_seek_to_position")
params = {"position_ms": absolute_position_ms}
response = await send_spotify_request(context, "PUT", url, params=params)
try:
response = await send_spotify_request(context, "PUT", url, params=params)
except httpx.HTTPStatusError as e:
raise ToolExecutionError(f"Failed to adjust playback position: {e}") from e
if response.status_code == 404:
return "No track to adjust position"
return RESPONSE_MSGS["no_track_to_adjust_position"]
response.raise_for_status()
return "Playback position adjusted"
return RESPONSE_MSGS["playback_position_adjusted"]
# NOTE: This tool only works for Spotify Premium users
@ -80,11 +85,11 @@ async def skip_to_previous_track(
response = await send_spotify_request(context, "POST", url)
if response.status_code == 404:
return "No track to go back to"
return RESPONSE_MSGS["no_track_to_go_back_to"]
response.raise_for_status()
return "Playback skipped to previous track"
return RESPONSE_MSGS["playback_skipped_to_previous_track"]
# NOTE: This tool only works for Spotify Premium users
@ -98,11 +103,11 @@ async def skip_to_next_track(
response = await send_spotify_request(context, "POST", url)
if response.status_code == 404:
return "No track to skip"
return RESPONSE_MSGS["no_track_to_skip"]
response.raise_for_status()
return "Playback skipped to next track"
return RESPONSE_MSGS["playback_skipped_to_next_track"]
# NOTE: This tool only works for Spotify Premium users
@ -115,17 +120,17 @@ async def pause_playback(
# There is no current state, therefore nothing to pause
if playback_state.get("device_id") is None:
return "No track to pause"
return RESPONSE_MSGS["no_track_to_pause"]
# Track is already paused
if playback_state.get("is_playing") is False:
return "Track is already paused"
return RESPONSE_MSGS["track_is_already_paused"]
url = get_url("player_pause_playback")
response = await send_spotify_request(context, "PUT", url)
response.raise_for_status()
return "Playback paused"
return RESPONSE_MSGS["playback_paused"]
# NOTE: This tool only works for Spotify Premium users
@ -142,17 +147,17 @@ async def resume_playback(
# There is no current state, therefore nothing to resume
if playback_state.get("device_id") is None:
return "No track to resume"
return RESPONSE_MSGS["no_track_to_resume"]
# Track is already playing
if playback_state.get("is_playing") is True:
return "Track is already playing"
return RESPONSE_MSGS["track_is_already_playing"]
url = get_url("player_modify_playback")
response = await send_spotify_request(context, "PUT", url)
response.raise_for_status()
return "Playback resumed"
return RESPONSE_MSGS["playback_resumed"]
# NOTE: This tool only works for Spotify Premium users
@ -195,11 +200,11 @@ async def start_tracks_playback_by_id(
response = await send_spotify_request(context, "PUT", url, params=params, json_data=body)
if response.status_code == 404:
return "Cannot start playback because no active device is available"
return RESPONSE_MSGS["no_active_device"]
response.raise_for_status()
return "Playback started"
return RESPONSE_MSGS["playback_started"]
@tool(requires_auth=Spotify(scopes=["user-read-playback-state"]))
@ -244,7 +249,7 @@ async def play_artist_by_name(
q = f"artist:{name}"
search_results = await search(context, q, [SearchType.TRACK], 5)
if not search_results["tracks"]["items"]:
message = f"Artist '{name}' not found."
message = RESPONSE_MSGS["artist_not_found"].format(artist_name=name)
raise RetryableToolError(
message,
additional_prompt_content=f"{message} Try a different artist name.",
@ -274,7 +279,7 @@ async def play_track_by_name(
search_results = await search(context, q, [SearchType.TRACK], 1)
if not search_results["tracks"]["items"]:
message = f"No track exists with name '{track_name}'"
message = RESPONSE_MSGS["track_not_found"].format(track_name=track_name)
if artist_name:
message += f" by '{artist_name}'"
raise RetryableToolError(

View file

@ -0,0 +1,34 @@
import pytest
from arcade.sdk import ToolContext
@pytest.fixture
def tool_context():
"""Fixture for the ToolContext with mock authorization."""
return ToolContext(authorization={"token": "test_token", "user_id": "test_user"})
@pytest.fixture
def mock_httpx_client(mocker):
"""Fixture to mock the httpx.AsyncClient."""
# Mock the AsyncClient context manager
mock_client = mocker.patch("httpx.AsyncClient", autospec=True)
async_mock_client = mock_client.return_value.__aenter__.return_value
return async_mock_client
@pytest.fixture
def sample_track():
"""Fixture for a sample track."""
return {
"album": {"id": "1234567890", "name": "Test Album", "uri": "spotify:album:1234567890"},
"artists": [{"name": "Test Artist", "type": "artist", "uri": "spotify:artist:1234567890"}],
"available_markets": ["us"],
"duration_ms": 123456,
"id": "1234567890",
"is_playable": True,
"name": "Test Track",
"popularity": 100,
"type": "track",
"uri": "spotify:track:1234567890",
}

View file

@ -0,0 +1,515 @@
from unittest.mock import MagicMock, patch
import httpx
import pytest
from arcade.sdk.errors import RetryableToolError, ToolExecutionError
from arcade_spotify.tools.constants import RESPONSE_MSGS
from arcade_spotify.tools.models import SearchType
from arcade_spotify.tools.player import (
adjust_playback_position,
get_available_devices,
get_currently_playing,
get_playback_state,
pause_playback,
play_artist_by_name,
play_track_by_name,
resume_playback,
skip_to_next_track,
skip_to_previous_track,
start_tracks_playback_by_id,
)
from arcade_spotify.tools.utils import get_url
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, tool_kwargs",
[
(adjust_playback_position, {"absolute_position_ms": 10000}),
(get_available_devices, {}),
(get_currently_playing, {}),
(get_playback_state, {}),
(pause_playback, {}),
(resume_playback, {}),
(start_tracks_playback_by_id, {"track_ids": ["1234567890"], "position_ms": 10000}),
(skip_to_previous_track, {}),
(skip_to_next_track, {}),
],
)
async def test_too_many_requests_http_error(
tool_function, tool_kwargs, tool_context, mock_httpx_client
):
mock_response = MagicMock()
mock_response.status_code = 429
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
"Too Many Requests", request=MagicMock(), response=MagicMock(status_code=429)
)
mock_httpx_client.request.return_value = mock_response
with pytest.raises(ToolExecutionError):
await tool_function(context=tool_context, **tool_kwargs)
@pytest.mark.asyncio
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_adjust_playback_position_absolute_success(
mock_get_playback_state, tool_context, mock_httpx_client
):
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await adjust_playback_position(context=tool_context, absolute_position_ms=10000)
assert response == RESPONSE_MSGS["playback_position_adjusted"]
mock_get_playback_state.assert_not_called()
mock_httpx_client.request.assert_called_once_with(
"PUT",
get_url("player_seek_to_position"),
headers={"Authorization": f"Bearer {tool_context.authorization.token}"},
params={"position_ms": 10000},
json=None,
)
@pytest.mark.asyncio
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_adjust_playback_position_relative_success(
mock_get_playback_state, tool_context, mock_httpx_client
):
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
mock_get_playback_state.return_value = {"device_id": "1234567890", "progress_ms": 10000}
response = await adjust_playback_position(context=tool_context, relative_position_ms=10000)
assert response == RESPONSE_MSGS["playback_position_adjusted"]
mock_get_playback_state.assert_called_once_with(tool_context)
mock_httpx_client.request.assert_called_once_with(
"PUT",
get_url("player_seek_to_position"),
headers={"Authorization": f"Bearer {tool_context.authorization.token}"},
params={"position_ms": 20000},
json=None,
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, tool_kwargs",
[
# Both arguments provided
(
adjust_playback_position,
{"absolute_position_ms": 10000, "relative_position_ms": 10000},
),
# No arguments provided
(
adjust_playback_position,
{},
),
],
)
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_adjust_playback_position_wrong_arguments_error(
mock_get_playback_state, tool_context, mock_httpx_client, tool_function, tool_kwargs
):
with pytest.raises(RetryableToolError):
await tool_function(context=tool_context, **tool_kwargs)
mock_get_playback_state.assert_not_called()
mock_httpx_client.assert_not_called()
@pytest.mark.asyncio
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_adjust_playback_position_no_device_error(
mock_get_playback_state, tool_context, mock_httpx_client
):
mock_get_playback_state.return_value = {"device_id": None}
response = await adjust_playback_position(context=tool_context, relative_position_ms=10000)
assert response == RESPONSE_MSGS["no_track_to_adjust_position"]
mock_get_playback_state.assert_called_once_with(tool_context)
mock_httpx_client.assert_not_called()
@pytest.mark.asyncio
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_adjust_playback_position_not_found_error(
mock_get_playback_state, tool_context, mock_httpx_client
):
mock_response = MagicMock()
mock_response.status_code = 404
mock_httpx_client.request.return_value = mock_response
response = await adjust_playback_position(context=tool_context, absolute_position_ms=10000)
assert response == RESPONSE_MSGS["no_track_to_adjust_position"]
mock_get_playback_state.assert_not_called()
mock_httpx_client.request.assert_called_once_with(
"PUT",
get_url("player_seek_to_position"),
headers={"Authorization": f"Bearer {tool_context.authorization.token}"},
params={"position_ms": 10000},
json=None,
)
@pytest.mark.asyncio
async def test_skip_to_previous_track_success(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await skip_to_previous_track(context=tool_context)
assert response == RESPONSE_MSGS["playback_skipped_to_previous_track"]
@pytest.mark.asyncio
async def test_skip_to_previous_track_not_found_error(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response.status_code = 404
mock_httpx_client.request.return_value = mock_response
response = await skip_to_previous_track(context=tool_context)
assert response == RESPONSE_MSGS["no_track_to_go_back_to"]
@pytest.mark.asyncio
async def test_skip_to_next_track_success(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await skip_to_next_track(context=tool_context)
assert response == RESPONSE_MSGS["playback_skipped_to_next_track"]
@pytest.mark.asyncio
async def test_skip_to_next_track_not_found_error(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response.status_code = 404
mock_httpx_client.request.return_value = mock_response
response = await skip_to_next_track(context=tool_context)
assert response == RESPONSE_MSGS["no_track_to_skip"]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, mock_is_playing, expected_message",
[
(pause_playback, True, RESPONSE_MSGS["playback_paused"]),
(resume_playback, False, RESPONSE_MSGS["playback_resumed"]),
],
)
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_change_playback_state_success(
mock_get_playback_state,
tool_context,
tool_function,
mock_is_playing,
expected_message,
mock_httpx_client,
):
mock_get_playback_state.return_value = {
"device_id": "1234567890",
"is_playing": mock_is_playing,
}
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await tool_function(context=tool_context)
assert response == expected_message
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, expected_message",
[
(pause_playback, RESPONSE_MSGS["no_track_to_pause"]),
(resume_playback, RESPONSE_MSGS["no_track_to_resume"]),
],
)
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_change_playback_state_no_device_running(
mock_get_playback_state, tool_context, tool_function, expected_message, mock_httpx_client
):
mock_get_playback_state.return_value = {"device_id": None}
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await tool_function(context=tool_context)
assert response == expected_message
mock_httpx_client.assert_not_called()
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, mock_is_playing, expected_message",
[
(pause_playback, False, RESPONSE_MSGS["track_is_already_paused"]),
(resume_playback, True, RESPONSE_MSGS["track_is_already_playing"]),
],
)
@patch("arcade_spotify.tools.player.get_playback_state")
async def test_change_playback_state_already_set_success(
mock_get_playback_state,
tool_context,
tool_function,
mock_is_playing,
expected_message,
mock_httpx_client,
):
mock_get_playback_state.return_value = {
"device_id": "1234567890",
"is_playing": mock_is_playing,
}
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await tool_function(context=tool_context)
assert response == expected_message
mock_httpx_client.assert_not_called()
@pytest.mark.asyncio
@patch("arcade_spotify.tools.player.get_available_devices")
async def test_start_tracks_playback_by_id_success(
mock_get_available_devices, tool_context, mock_httpx_client
):
mock_get_available_devices.return_value = {
"devices": [
{
"id": "1234567890",
"is_active": True,
"name": "Test Device",
"type": "Computer",
"is_private_session": False,
"is_restricted": False,
"supports_volume": True,
"volume_percent": 100,
}
]
}
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await start_tracks_playback_by_id(
context=tool_context, track_ids=["1234567890"], position_ms=10000
)
assert response == RESPONSE_MSGS["playback_started"]
@pytest.mark.asyncio
@patch("arcade_spotify.tools.player.get_available_devices")
async def test_start_tracks_playback_by_id_no_active_device(
mock_get_available_devices, tool_context, mock_httpx_client
):
mock_get_available_devices.return_value = {"devices": []}
mock_response = MagicMock()
mock_response.status_code = 404
mock_httpx_client.request.return_value = mock_response
response = await start_tracks_playback_by_id(
context=tool_context, track_ids=["1234567890"], position_ms=10000
)
assert response == RESPONSE_MSGS["no_active_device"]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, expected_message",
[
(get_playback_state, RESPONSE_MSGS["playback_started"]),
(get_currently_playing, RESPONSE_MSGS["playback_started"]),
],
)
async def test_get_state_success(
tool_context,
mock_httpx_client,
tool_function,
expected_message,
):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"device": {
"id": "1234567890",
"is_active": True,
"name": "Test Device",
"type": "Computer",
},
"currently_playing_type": "track",
"is_playing": True,
"progress_ms": 10000,
"message": "Playback started",
}
mock_httpx_client.request.return_value = mock_response
response = await tool_function(context=tool_context)
assert response["device_id"] == "1234567890"
assert response["device_name"] == "Test Device"
assert response["is_playing"] is True
assert response["progress_ms"] == 10000
assert response["message"] == "Playback started"
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function",
[get_playback_state, get_currently_playing],
)
async def test_get_state_playback_not_active(tool_context, mock_httpx_client, tool_function):
mock_response = MagicMock()
mock_response.status_code = 204
mock_httpx_client.request.return_value = mock_response
response = await tool_function(context=tool_context)
assert response["is_playing"] is False
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, tool_kwargs, expected_search_query, expected_limit",
[
(play_artist_by_name, {"name": "Test Artist"}, "artist:Test Artist", 5),
(play_track_by_name, {"track_name": "Test Track"}, "track:Test Track", 1),
],
)
@patch("arcade_spotify.tools.player.start_tracks_playback_by_id")
@patch("arcade_spotify.tools.player.search")
async def test_play_by_name_success(
mock_search,
mock_start_tracks_playback_by_id,
tool_context,
tool_function,
tool_kwargs,
expected_search_query,
expected_limit,
mock_httpx_client,
):
track_id = "1234567890"
mock_search.return_value = {"tracks": {"items": [{"id": track_id, "name": "Test Track"}]}}
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
mock_start_tracks_playback_by_id.return_value = RESPONSE_MSGS["playback_started"]
response = await tool_function(context=tool_context, **tool_kwargs)
assert response == RESPONSE_MSGS["playback_started"]
mock_search.assert_called_once_with(
tool_context,
expected_search_query,
[SearchType.TRACK],
expected_limit,
)
mock_start_tracks_playback_by_id.assert_called_once_with(tool_context, [track_id])
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tool_function, tool_kwargs, expected_search_query, expected_limit, expected_message",
[
(
play_artist_by_name,
{"name": "Test Artist"},
"artist:Test Artist",
5,
RESPONSE_MSGS["artist_not_found"].format(artist_name="Test Artist"),
),
(
play_track_by_name,
{"track_name": "Test Track"},
"track:Test Track",
1,
RESPONSE_MSGS["track_not_found"].format(track_name="Test Track"),
),
],
)
@patch("arcade_spotify.tools.player.start_tracks_playback_by_id")
@patch("arcade_spotify.tools.player.search")
async def test_play_by_name_no_tracks_found(
mock_search,
mock_start_tracks_playback_by_id,
tool_context,
tool_function,
tool_kwargs,
expected_search_query,
expected_limit,
expected_message,
mock_httpx_client,
):
mock_search.return_value = {"tracks": {"items": []}}
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
mock_start_tracks_playback_by_id.return_value = RESPONSE_MSGS["playback_started"]
with pytest.raises(RetryableToolError) as e:
await tool_function(context=tool_context, **tool_kwargs)
assert e.value.message == expected_message
mock_search.assert_called_once_with(
tool_context, expected_search_query, [SearchType.TRACK], expected_limit
)
mock_start_tracks_playback_by_id.assert_not_called()
@pytest.mark.asyncio
@patch("arcade_spotify.tools.player.start_tracks_playback_by_id")
@patch("arcade_spotify.tools.player.search")
async def test_play_track_by_name_with_artist_success(
mock_search, mock_start_tracks_playback_by_id, tool_context, mock_httpx_client
):
track_id = "1234567890"
mock_search.return_value = {"tracks": {"items": [{"id": track_id, "name": "Test Track"}]}}
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await play_track_by_name(
context=tool_context, track_name="Test Track", artist_name="Test Artist"
)
assert response == str(mock_start_tracks_playback_by_id.return_value)
mock_search.assert_called_once_with(
tool_context, "track:Test Track artist:Test Artist", [SearchType.TRACK], 1
)
mock_start_tracks_playback_by_id.assert_called_once_with(tool_context, [track_id])
@pytest.mark.asyncio
async def test_get_available_devices_success(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"devices": [{"id": "1234567890", "name": "Test Device", "type": "Computer"}]
}
mock_httpx_client.request.return_value = mock_response
response = await get_available_devices(context=tool_context)
assert response == dict(mock_response.json())

View file

@ -0,0 +1,59 @@
from unittest.mock import MagicMock
import httpx
import pytest
from arcade.sdk.errors import ToolExecutionError
from arcade_spotify.tools.models import SearchType
from arcade_spotify.tools.search import search
from arcade_spotify.tools.utils import get_url
@pytest.mark.asyncio
async def test_search_success(tool_context, mock_httpx_client, sample_track):
sample_tracks = []
for i in range(4):
sample_track = sample_track.copy()
sample_track["id"] = f"{i}"
sample_tracks.append(sample_track)
search_response = {
"tracks": {
"href": "https://api.spotify.com/v1/me/shows?offset=0&limit=20",
"limit": 20,
"next": "https://api.spotify.com/v1/me/shows?offset=1&limit=1",
"offset": 0,
"previous": "https://api.spotify.com/v1/me/shows?offset=1&limit=1",
"total": 4,
"items": sample_tracks,
},
}
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = search_response
mock_httpx_client.request.return_value = mock_response
result = await search(tool_context, "test", [SearchType.TRACK], 4)
assert result == search_response
mock_httpx_client.request.assert_called_once_with(
"GET",
get_url("search", q="test"),
headers={"Authorization": f"Bearer {tool_context.authorization.token}"},
params={"q": "test", "type": SearchType.TRACK.value, "limit": 4},
json=None,
)
@pytest.mark.asyncio
async def test_search_rate_limit_error(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response = httpx.HTTPStatusError(
"Too Many Requests", request=MagicMock(), response=MagicMock(status_code=429)
)
mock_httpx_client.request.side_effect = mock_response
with pytest.raises(ToolExecutionError):
await search(tool_context, "test", [SearchType.TRACK], 4)

View file

@ -0,0 +1,40 @@
from unittest.mock import MagicMock
import httpx
import pytest
from arcade.sdk.errors import ToolExecutionError
from arcade_spotify.tools.tracks import get_track_from_id
from arcade_spotify.tools.utils import get_url
@pytest.mark.asyncio
async def test_get_track_from_id_success(tool_context, mock_httpx_client, sample_track):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = sample_track
mock_httpx_client.request.return_value = mock_response
result = await get_track_from_id(tool_context, "1234567890")
assert result == sample_track
mock_httpx_client.request.assert_called_once_with(
"GET",
get_url("tracks_get_track", track_id="1234567890"),
headers={"Authorization": f"Bearer {tool_context.authorization.token}"},
params=None,
json=None,
)
@pytest.mark.asyncio
async def test_get_track_from_id_rate_limit_error(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response = httpx.HTTPStatusError(
"Too Many Requests", request=MagicMock(), response=MagicMock(status_code=429)
)
mock_httpx_client.request.side_effect = mock_response
with pytest.raises(ToolExecutionError):
await get_track_from_id(tool_context, "1234567890")

View file

@ -1,5 +1,32 @@
from unittest.mock import MagicMock
import pytest
from arcade_spotify.tools.models import PlaybackState
from arcade_spotify.tools.utils import convert_to_playback_state
from arcade_spotify.tools.utils import convert_to_playback_state, send_spotify_request
@pytest.mark.asyncio
async def test_send_spotify_request(tool_context, mock_httpx_client):
mock_response = MagicMock()
mock_response.status_code = 200
mock_httpx_client.request.return_value = mock_response
response = await send_spotify_request(
tool_context,
"GET",
"https://api.spotify.com/v1/me/player",
params={"param": "value"},
json_data={"data": "value"},
)
assert response == mock_response
mock_httpx_client.request.assert_called_once_with(
"GET",
"https://api.spotify.com/v1/me/player",
headers={"Authorization": "Bearer test_token"},
params={"param": "value"},
json={"data": "value"},
)
def test_convert_to_playback_state():