Asana Toolkit (#381)

This commit is contained in:
Renato Byrro 2025-05-12 14:59:52 -03:00 committed by GitHub
parent 44d7a23bdd
commit b9afa1b5cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 3719 additions and 0 deletions

View file

@ -1,3 +1,4 @@
arcade-asana
arcade-code-sandbox
arcade-dropbox
arcade-github

View file

@ -0,0 +1,18 @@
files: ^./
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: "v4.4.0"
hooks:
- id: check-case-conflict
- id: check-merge-conflict
- id: check-toml
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.7
hooks:
- id: ruff
args: [--fix]
- id: ruff-format

46
toolkits/asana/.ruff.toml Normal file
View file

@ -0,0 +1,46 @@
target-version = "py39"
line-length = 100
fix = true
[lint]
select = [
# flake8-2020
"YTT",
# flake8-bandit
"S",
# flake8-bugbear
"B",
# flake8-builtins
"A",
# flake8-comprehensions
"C4",
# flake8-debugger
"T10",
# flake8-simplify
"SIM",
# isort
"I",
# mccabe
"C90",
# pycodestyle
"E", "W",
# pyflakes
"F",
# pygrep-hooks
"PGH",
# pyupgrade
"UP",
# ruff
"RUF",
# tryceratops
"TRY",
]
[lint.per-file-ignores]
"*" = ["TRY003", "B904"]
"**/tests/*" = ["S101", "E501"]
"**/evals/*" = ["S101", "E501"]
[format]
preview = true
skip-magic-trailing-comma = false

21
toolkits/asana/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Arcade
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

53
toolkits/asana/Makefile Normal file
View file

@ -0,0 +1,53 @@
.PHONY: help
help:
@echo "🛠️ dropbox Commands:\n"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
.PHONY: install
install: ## Install the poetry environment and install the pre-commit hooks
@echo "📦 Checking if Poetry is installed"
@if ! command -v poetry &> /dev/null; then \
echo "📦 Installing Poetry with pip"; \
pip install poetry==1.8.5; \
else \
echo "📦 Poetry is already installed"; \
fi
@echo "🚀 Installing package in development mode with all extras"
poetry install --all-extras
.PHONY: build
build: clean-build ## Build wheel file using poetry
@echo "🚀 Creating wheel file"
poetry build
.PHONY: clean-build
clean-build: ## clean build artifacts
@echo "🗑️ Cleaning dist directory"
rm -rf dist
.PHONY: test
test: ## Test the code with pytest
@echo "🚀 Testing code: Running pytest"
@poetry run pytest -W ignore -v --cov --cov-config=pyproject.toml --cov-report=xml
.PHONY: coverage
coverage: ## Generate coverage report
@echo "coverage report"
coverage report
@echo "Generating coverage report"
coverage html
.PHONY: bump-version
bump-version: ## Bump the version in the pyproject.toml file
@echo "🚀 Bumping version in pyproject.toml"
poetry version patch
.PHONY: check
check: ## Run code quality tools.
@echo "🚀 Checking Poetry lock file consistency with 'pyproject.toml': Running poetry check"
@poetry check
@echo "🚀 Linting code: Running pre-commit"
@poetry run pre-commit run -a
@echo "🚀 Static type checking: Running mypy"
@poetry run mypy --config-file=pyproject.toml

View file

View file

@ -0,0 +1,139 @@
import os
from enum import Enum
ASANA_BASE_URL = "https://app.asana.com/api"
ASANA_API_VERSION = "1.0"
try:
ASANA_MAX_CONCURRENT_REQUESTS = int(os.getenv("ASANA_MAX_CONCURRENT_REQUESTS", 3))
except ValueError:
ASANA_MAX_CONCURRENT_REQUESTS = 3
try:
ASANA_MAX_TIMEOUT_SECONDS = int(os.getenv("ASANA_MAX_TIMEOUT_SECONDS", 20))
except ValueError:
ASANA_MAX_TIMEOUT_SECONDS = 20
MAX_PROJECTS_TO_SCAN_BY_NAME = 1000
MAX_TAGS_TO_SCAN_BY_NAME = 1000
PROJECT_OPT_FIELDS = [
"gid",
"resource_type",
"name",
"workspace",
"color",
"created_at",
"current_status_update",
"due_on",
"members",
"notes",
"completed",
"completed_at",
"completed_by",
"owner",
"team",
"workspace",
"permalink_url",
]
TASK_OPT_FIELDS = [
"gid",
"name",
"notes",
"completed",
"completed_at",
"completed_by",
"created_at",
"created_by",
"due_on",
"start_on",
"owner",
"team",
"workspace",
"permalink_url",
"approval_status",
"assignee",
"assignee_status",
"dependencies",
"dependents",
"memberships",
"num_subtasks",
"resource_type",
"custom_type",
"custom_type_status_option",
"parent",
"tags",
"workspace",
]
TAG_OPT_FIELDS = [
"gid",
"name",
"workspace",
]
TEAM_OPT_FIELDS = [
"gid",
"name",
"description",
"organization",
"permalink_url",
]
USER_OPT_FIELDS = [
"gid",
"resource_type",
"name",
"email",
"photo",
"workspaces",
]
WORKSPACE_OPT_FIELDS = [
"gid",
"resource_type",
"name",
"email_domains",
"is_organization",
]
class TaskSortBy(Enum):
DUE_DATE = "due_date"
CREATED_AT = "created_at"
COMPLETED_AT = "completed_at"
MODIFIED_AT = "modified_at"
LIKES = "likes"
class SortOrder(Enum):
ASCENDING = "ascending"
DESCENDING = "descending"
class TagColor(Enum):
DARK_GREEN = "dark-green"
DARK_RED = "dark-red"
DARK_BLUE = "dark-blue"
DARK_PURPLE = "dark-purple"
DARK_PINK = "dark-pink"
DARK_ORANGE = "dark-orange"
DARK_TEAL = "dark-teal"
DARK_BROWN = "dark-brown"
DARK_WARM_GRAY = "dark-warm-gray"
LIGHT_GREEN = "light-green"
LIGHT_RED = "light-red"
LIGHT_BLUE = "light-blue"
LIGHT_PURPLE = "light-purple"
LIGHT_PINK = "light-pink"
LIGHT_ORANGE = "light-orange"
LIGHT_TEAL = "light-teal"
LIGHT_BROWN = "light-brown"
LIGHT_WARM_GRAY = "light-warm-gray"
class ReturnType(Enum):
FULL_ITEMS_DATA = "full_items_data"
ITEMS_COUNT = "items_count"

View file

@ -0,0 +1,26 @@
from functools import wraps
from typing import Any, Callable
def clean_asana_response(func: Callable[..., Any]) -> Callable[..., Any]:
def response_cleaner(data: dict[str, Any]) -> dict[str, Any]:
if "gid" in data:
data["id"] = data["gid"]
del data["gid"]
for k, v in data.items():
if isinstance(v, dict):
data[k] = response_cleaner(v)
elif isinstance(v, list):
data[k] = [
item if not isinstance(item, dict) else response_cleaner(item) for item in v
]
return data
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
response = await func(*args, **kwargs)
return response_cleaner(response)
return wrapper

View file

@ -0,0 +1,14 @@
from arcade.sdk.errors import ToolExecutionError
class AsanaToolExecutionError(ToolExecutionError):
pass
class PaginationTimeoutError(AsanaToolExecutionError):
def __init__(self, timeout_seconds: int, tool_name: str):
message = f"Pagination timed out after {timeout_seconds} seconds"
super().__init__(
message=message,
developer_message=f"{message} while calling the tool {tool_name}",
)

View file

@ -0,0 +1,165 @@
import asyncio
import json
from dataclasses import dataclass
from typing import Optional, cast
import httpx
from arcade_asana.constants import ASANA_API_VERSION, ASANA_BASE_URL, ASANA_MAX_CONCURRENT_REQUESTS
from arcade_asana.decorators import clean_asana_response
from arcade_asana.exceptions import AsanaToolExecutionError
@dataclass
class AsanaClient:
auth_token: str
base_url: str = ASANA_BASE_URL
api_version: str = ASANA_API_VERSION
max_concurrent_requests: int = ASANA_MAX_CONCURRENT_REQUESTS
_semaphore: asyncio.Semaphore | None = None
def __post_init__(self) -> None:
self._semaphore = self._semaphore or asyncio.Semaphore(self.max_concurrent_requests)
def _build_url(self, endpoint: str, api_version: str | None = None) -> str:
api_version = api_version or self.api_version
return f"{self.base_url.rstrip('/')}/{api_version.strip('/')}/{endpoint.lstrip('/')}"
def _build_error_messages(self, response: httpx.Response) -> tuple[str, str]:
try:
data = response.json()
errors = data["errors"]
if len(errors) == 1:
error_message = errors[0]["message"]
developer_message = (
f"{errors[0]['message']} | {errors[0]['help']} "
f"(HTTP status code: {response.status_code})"
)
else:
errors_concat = "', '".join([error["message"] for error in errors])
error_message = f"Multiple errors occurred: '{errors_concat}'"
developer_message = (
f"Multiple errors occurred: {json.dumps(errors)} "
f"(HTTP status code: {response.status_code})"
)
except Exception as e:
error_message = "Failed to parse Asana error response"
developer_message = f"Failed to parse Asana error response: {type(e).__name__}: {e!s}"
return error_message, developer_message
def _raise_for_status(self, response: httpx.Response) -> None:
if response.status_code < 300:
return
error_message, developer_message = self._build_error_messages(response)
raise AsanaToolExecutionError(error_message, developer_message)
def _set_request_body(self, kwargs: dict, data: dict | None, json_data: dict | None) -> dict:
if data and json_data:
raise ValueError("Cannot provide both data and json_data")
if data:
kwargs["data"] = data
elif json_data:
kwargs["json"] = json_data
return kwargs
@clean_asana_response
async def get(
self,
endpoint: str,
params: Optional[dict] = None,
headers: Optional[dict] = None,
api_version: str | None = None,
) -> dict:
default_headers = {
"Authorization": f"Bearer {self.auth_token}",
"Accept": "application/json",
}
headers = {**default_headers, **(headers or {})}
kwargs = {
"url": self._build_url(endpoint, api_version),
"headers": headers,
}
if params:
kwargs["params"] = params
async with self._semaphore, httpx.AsyncClient() as client: # type: ignore[union-attr]
response = await client.get(**kwargs) # type: ignore[arg-type]
self._raise_for_status(response)
return cast(dict, response.json())
@clean_asana_response
async def post(
self,
endpoint: str,
data: Optional[dict] = None,
json_data: Optional[dict] = None,
files: Optional[dict] = None,
headers: Optional[dict] = None,
api_version: str | None = None,
) -> dict:
default_headers = {
"Authorization": f"Bearer {self.auth_token}",
"Accept": "application/json",
}
if files is None and json_data is not None:
default_headers["Content-Type"] = "application/json"
headers = {**default_headers, **(headers or {})}
kwargs = {
"url": self._build_url(endpoint, api_version),
"headers": headers,
}
if files is not None:
kwargs["files"] = files
if data is not None:
kwargs["data"] = data
else:
kwargs = self._set_request_body(kwargs, data, json_data)
async with self._semaphore, httpx.AsyncClient() as client: # type: ignore[union-attr]
response = await client.post(**kwargs) # type: ignore[arg-type]
self._raise_for_status(response)
return cast(dict, response.json())
@clean_asana_response
async def put(
self,
endpoint: str,
data: Optional[dict] = None,
json_data: Optional[dict] = None,
headers: Optional[dict] = None,
api_version: str | None = None,
) -> dict:
headers = headers or {}
headers["Authorization"] = f"Bearer {self.auth_token}"
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
kwargs = {
"url": self._build_url(endpoint, api_version),
"headers": headers,
}
kwargs = self._set_request_body(kwargs, data, json_data)
async with self._semaphore, httpx.AsyncClient() as client: # type: ignore[union-attr]
response = await client.put(**kwargs) # type: ignore[arg-type]
self._raise_for_status(response)
return cast(dict, response.json())
async def get_current_user(self) -> dict:
response = await self.get("/users/me")
return cast(dict, response["data"])

View file

@ -0,0 +1,31 @@
from arcade_asana.tools.projects import get_project_by_id, list_projects
from arcade_asana.tools.tags import create_tag, list_tags
from arcade_asana.tools.tasks import (
attach_file_to_task,
create_task,
get_subtasks_from_a_task,
get_task_by_id,
get_tasks_without_id,
update_task,
)
from arcade_asana.tools.teams import get_team_by_id, list_teams_the_current_user_is_a_member_of
from arcade_asana.tools.users import get_user_by_id, list_users
from arcade_asana.tools.workspaces import list_workspaces
__all__ = [
"attach_file_to_task",
"create_tag",
"create_task",
"get_project_by_id",
"get_subtasks_from_a_task",
"get_task_by_id",
"get_team_by_id",
"get_user_by_id",
"list_projects",
"list_tags",
"list_teams_the_current_user_is_a_member_of",
"list_users",
"list_workspaces",
"get_tasks_without_id",
"update_task",
]

View file

@ -0,0 +1,81 @@
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Asana
from arcade_asana.constants import PROJECT_OPT_FIELDS
from arcade_asana.models import AsanaClient
from arcade_asana.utils import (
get_next_page,
get_unique_workspace_id_or_raise_error,
remove_none_values,
)
@tool(requires_auth=Asana(scopes=["default"]))
async def get_project_by_id(
context: ToolContext,
project_id: Annotated[str, "The ID of the project."],
) -> Annotated[
dict[str, Any],
"Get a project by its ID",
]:
"""Get an Asana project by its ID"""
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
f"/projects/{project_id}",
params={"opt_fields": PROJECT_OPT_FIELDS},
)
return {"project": response["data"]}
@tool(requires_auth=Asana(scopes=["default"]))
async def list_projects(
context: ToolContext,
team_id: Annotated[
str | None,
"The team ID to get projects from. Defaults to None (does not filter by team).",
] = None,
workspace_id: Annotated[
str | None,
"The workspace ID to get projects from. Defaults to None. If not provided and the user "
"has only one workspace, it will use that workspace. If not provided and the user has "
"multiple workspaces, it will raise an error listing the available workspaces.",
] = None,
limit: Annotated[
int, "The maximum number of projects to return. Min is 1, max is 100. Defaults to 100."
] = 100,
next_page_token: Annotated[
str | None,
"The token to retrieve the next page of projects. Defaults to None (start from the first "
"page of projects).",
] = None,
) -> Annotated[
dict[str, Any],
"List projects in Asana associated to teams the current user is a member of",
]:
"""List projects in Asana"""
# Note: Asana recommends filtering by team to avoid timeout in large domains.
# Ref: https://developers.asana.com/reference/getprojects
limit = max(1, min(100, limit))
workspace_id = workspace_id or await get_unique_workspace_id_or_raise_error(context)
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
"/projects",
params=remove_none_values({
"limit": limit,
"offset": next_page_token,
"team": team_id,
"workspace": workspace_id,
"opt_fields": PROJECT_OPT_FIELDS,
}),
)
return {
"projects": response["data"],
"count": len(response["data"]),
"next_page": get_next_page(response),
}

View file

@ -0,0 +1,102 @@
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Asana
from arcade.sdk.errors import ToolExecutionError
from arcade_asana.constants import TAG_OPT_FIELDS, TagColor
from arcade_asana.models import AsanaClient
from arcade_asana.utils import (
get_next_page,
get_unique_workspace_id_or_raise_error,
remove_none_values,
)
@tool(requires_auth=Asana(scopes=["default"]))
async def get_tag_by_id(
context: ToolContext,
tag_id: Annotated[str, "The ID of the Asana tag to get"],
) -> Annotated[dict[str, Any], "Get an Asana tag by its ID"]:
"""Get an Asana tag by its ID"""
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(f"/tags/{tag_id}")
return {"tag": response["data"]}
@tool(requires_auth=Asana(scopes=["default"]))
async def create_tag(
context: ToolContext,
name: Annotated[str, "The name of the tag to create. Length must be between 1 and 100."],
description: Annotated[
str | None, "The description of the tag to create. Defaults to None (no description)."
] = None,
color: Annotated[
TagColor | None, "The color of the tag to create. Defaults to None (no color)."
] = None,
workspace_id: Annotated[
str | None,
"The ID of the workspace to create the tag in. If not provided, it will associated the tag "
"to a current workspace, if there's only one. Otherwise, it will raise an error.",
] = None,
) -> Annotated[dict[str, Any], "The created tag."]:
"""Create a tag in Asana"""
if not 1 <= len(name) <= 100:
raise ToolExecutionError("Tag name must be between 1 and 100 characters long.")
workspace_id = workspace_id or await get_unique_workspace_id_or_raise_error(context)
data = remove_none_values({
"name": name,
"notes": description,
"color": color.value if color else None,
"workspace": workspace_id,
})
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.post("/tags", json_data={"data": data})
return {"tag": response["data"]}
@tool(requires_auth=Asana(scopes=["default"]))
async def list_tags(
context: ToolContext,
workspace_id: Annotated[
str | None,
"The workspace ID to retrieve tags from. Defaults to None. If not provided and the user "
"has only one workspace, it will use that workspace. If not provided and the user has "
"multiple workspaces, it will raise an error listing the available workspaces.",
] = None,
limit: Annotated[
int, "The maximum number of tags to return. Min is 1, max is 100. Defaults to 100."
] = 100,
next_page_token: Annotated[
str | None,
"The token to retrieve the next page of tags. Defaults to None (start from the first page "
"of tags)",
] = None,
) -> Annotated[
dict[str, Any],
"List tags in an Asana workspace",
]:
"""List tags in an Asana workspace"""
limit = max(1, min(100, limit))
workspace_id = workspace_id or await get_unique_workspace_id_or_raise_error(context)
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
"/tags",
params=remove_none_values({
"limit": limit,
"offset": next_page_token,
"workspace": workspace_id,
"opt_fields": TAG_OPT_FIELDS,
}),
)
return {
"tags": response["data"],
"count": len(response["data"]),
"next_page": get_next_page(response),
}

View file

@ -0,0 +1,455 @@
import base64
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Asana
from arcade.sdk.errors import ToolExecutionError
from arcade_asana.constants import TASK_OPT_FIELDS, SortOrder, TaskSortBy
from arcade_asana.models import AsanaClient
from arcade_asana.utils import (
build_task_search_query_params,
get_next_page,
get_project_by_name_or_raise_error,
get_tag_ids,
get_unique_workspace_id_or_raise_error,
handle_new_task_associations,
handle_new_task_tags,
remove_none_values,
validate_date_format,
)
@tool(requires_auth=Asana(scopes=["default"]))
async def get_tasks_without_id(
context: ToolContext,
keywords: Annotated[
str | None, "Keywords to search for tasks. Matches against the task name and description."
] = None,
workspace_id: Annotated[
str | None,
"The workspace ID to search for tasks. Defaults to None. If not provided and the user "
"has only one workspace, it will use that workspace. If not provided and the user has "
"multiple workspaces, it will raise an error listing the available workspaces.",
] = None,
assignee_id: Annotated[
str | None,
"The ID of the user to filter tasks assigned to. "
"Defaults to None (does not filter by assignee).",
] = None,
project: Annotated[
str | None,
"The ID or name of the project to filter tasks. "
"Defaults to None (searches tasks associated to any project or no project).",
] = None,
team_id: Annotated[
str | None,
"Restricts the search to tasks associated to the given team ID. "
"Defaults to None (searches tasks associated to any team).",
] = None,
tags: Annotated[
list[str] | None,
"Restricts the search to tasks associated to the given tags. "
"Each item in the list can be a tag name (e.g. 'My Tag') or a tag ID (e.g. '1234567890'). "
"Defaults to None (searches tasks associated to any tag or no tag).",
] = None,
due_on: Annotated[
str | None,
"Match tasks that are due exactly on this date. Format: YYYY-MM-DD. Ex: '2025-01-01'. "
"Defaults to None (searches tasks due on any date or without a due date).",
] = None,
due_on_or_after: Annotated[
str | None,
"Match tasks that are due on OR AFTER this date. Format: YYYY-MM-DD. Ex: '2025-01-01' "
"Defaults to None (searches tasks due on any date or without a due date).",
] = None,
due_on_or_before: Annotated[
str | None,
"Match tasks that are due on OR BEFORE this date. Format: YYYY-MM-DD. Ex: '2025-01-01' "
"Defaults to None (searches tasks due on any date or without a due date).",
] = None,
start_on: Annotated[
str | None,
"Match tasks that started on this date. Format: YYYY-MM-DD. Ex: '2025-01-01'. "
"Defaults to None (searches tasks started on any date or without a start date).",
] = None,
start_on_or_after: Annotated[
str | None,
"Match tasks that started on OR AFTER this date. Format: YYYY-MM-DD. Ex: '2025-01-01' "
"Defaults to None (searches tasks started on any date or without a start date).",
] = None,
start_on_or_before: Annotated[
str | None,
"Match tasks that started on OR BEFORE this date. Format: YYYY-MM-DD. Ex: '2025-01-01' "
"Defaults to None (searches tasks started on any date or without a start date).",
] = None,
completed: Annotated[
bool | None,
"Match tasks that are completed. Defaults to None (does not filter by completion status).",
] = None,
limit: Annotated[
int,
"The maximum number of tasks to return. Min of 1, max of 100. Defaults to 100.",
] = 100,
sort_by: Annotated[
TaskSortBy,
"The field to sort the tasks by. Defaults to TaskSortBy.MODIFIED_AT.",
] = TaskSortBy.MODIFIED_AT,
sort_order: Annotated[
SortOrder,
"The order to sort the tasks by. Defaults to SortOrder.DESCENDING.",
] = SortOrder.DESCENDING,
) -> Annotated[dict[str, Any], "The tasks that match the query."]:
"""Search for tasks"""
limit = max(1, min(100, limit))
project_id = None
if project:
if project.isnumeric():
project_id = project
else:
project_data = await get_project_by_name_or_raise_error(context, project)
project_id = project_data["id"]
if not workspace_id:
workspace_id = project_data["workspace"]["id"]
tag_ids = await get_tag_ids(context, tags)
client = AsanaClient(context.get_auth_token_or_empty())
validate_date_format("due_on", due_on)
validate_date_format("due_on_or_after", due_on_or_after)
validate_date_format("due_on_or_before", due_on_or_before)
validate_date_format("start_on", start_on)
validate_date_format("start_on_or_after", start_on_or_after)
validate_date_format("start_on_or_before", start_on_or_before)
if not any([workspace_id, project_id, team_id]):
workspace_id = await get_unique_workspace_id_or_raise_error(context)
if not workspace_id and team_id:
from arcade_asana.tools.teams import get_team_by_id
team = await get_team_by_id(context, team_id)
workspace_id = team["organization"]["id"]
response = await client.get(
f"/workspaces/{workspace_id}/tasks/search",
params=build_task_search_query_params(
keywords=keywords,
completed=completed,
assignee_id=assignee_id,
project_id=project_id,
team_id=team_id,
tag_ids=tag_ids,
due_on=due_on,
due_on_or_after=due_on_or_after,
due_on_or_before=due_on_or_before,
start_on=start_on,
start_on_or_after=start_on_or_after,
start_on_or_before=start_on_or_before,
limit=limit,
sort_by=sort_by,
sort_order=sort_order,
),
)
tasks_by_id = {task["id"]: task for task in response["data"]}
tasks = list(tasks_by_id.values())
return {"tasks": tasks, "count": len(tasks)}
@tool(requires_auth=Asana(scopes=["default"]))
async def get_task_by_id(
context: ToolContext,
task_id: Annotated[str, "The ID of the task to get."],
max_subtasks: Annotated[
int,
"The maximum number of subtasks to return. "
"Min of 0 (no subtasks), max of 100. Defaults to 100.",
] = 100,
) -> Annotated[dict[str, Any], "The task with the given ID."]:
"""Get a task by its ID"""
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
f"/tasks/{task_id}",
params={"opt_fields": TASK_OPT_FIELDS},
)
if max_subtasks > 0:
max_subtasks = min(max_subtasks, 100)
subtasks = await get_subtasks_from_a_task(context, task_id=task_id, limit=max_subtasks)
response["data"]["subtasks"] = subtasks["subtasks"]
return {"task": response["data"]}
@tool(requires_auth=Asana(scopes=["default"]))
async def get_subtasks_from_a_task(
context: ToolContext,
task_id: Annotated[str, "The ID of the task to get the subtasks of."],
limit: Annotated[
int,
"The maximum number of subtasks to return. Min of 1, max of 100. Defaults to 100.",
] = 100,
next_page_token: Annotated[
str | None,
"The token to retrieve the next page of subtasks. Defaults to None (start from the first "
"page of subtasks)",
] = None,
) -> Annotated[dict[str, Any], "The subtasks of the task."]:
"""Get the subtasks of a task"""
limit = max(1, min(100, limit))
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
f"/tasks/{task_id}/subtasks",
params=remove_none_values({
"opt_fields": TASK_OPT_FIELDS,
"limit": limit,
"offset": next_page_token,
}),
)
return {
"subtasks": response["data"],
"count": len(response["data"]),
"next_page": get_next_page(response),
}
@tool(requires_auth=Asana(scopes=["default"]))
async def update_task(
context: ToolContext,
task_id: Annotated[str, "The ID of the task to update."],
name: Annotated[
str | None,
"The new name of the task. Defaults to None (does not change the current name).",
] = None,
completed: Annotated[
bool | None,
"The new completion status of the task. "
"Provide True to mark the task as completed, False to mark it as not completed. "
"Defaults to None (does not change the current completion status).",
] = None,
start_date: Annotated[
str | None,
"The new start date of the task in the format YYYY-MM-DD. Example: '2025-01-01'. "
"Defaults to None (does not change the current start date).",
] = None,
due_date: Annotated[
str | None,
"The new due date of the task in the format YYYY-MM-DD. Example: '2025-01-01'. "
"Defaults to None (does not change the current due date).",
] = None,
description: Annotated[
str | None,
"The new description of the task. "
"Defaults to None (does not change the current description).",
] = None,
assignee_id: Annotated[
str | None,
"The ID of the new user to assign the task to. "
"Provide 'me' to assign the task to the current user. "
"Defaults to None (does not change the current assignee).",
] = None,
) -> Annotated[
dict[str, Any],
"Updates a task in Asana",
]:
"""Updates a task in Asana"""
client = AsanaClient(context.get_auth_token_or_empty())
validate_date_format("start_date", start_date)
validate_date_format("due_date", due_date)
task_data = {
"data": remove_none_values({
"name": name,
"completed": completed,
"due_on": due_date,
"start_on": start_date,
"notes": description,
"assignee": assignee_id,
}),
}
response = await client.put(f"/tasks/{task_id}", json_data=task_data)
return {
"status": {"success": True, "message": "task updated successfully"},
"task": response["data"],
}
@tool(requires_auth=Asana(scopes=["default"]))
async def mark_task_as_completed(
context: ToolContext,
task_id: Annotated[str, "The ID of the task to mark as completed."],
) -> Annotated[dict[str, Any], "The task marked as completed."]:
"""Mark a task in Asana as completed"""
return await update_task(context, task_id, completed=True) # type: ignore[no-any-return]
@tool(requires_auth=Asana(scopes=["default"]))
async def create_task(
context: ToolContext,
name: Annotated[str, "The name of the task"],
start_date: Annotated[
str | None,
"The start date of the task in the format YYYY-MM-DD. Example: '2025-01-01'. "
"Defaults to None.",
] = None,
due_date: Annotated[
str | None,
"The due date of the task in the format YYYY-MM-DD. Example: '2025-01-01'. "
"Defaults to None.",
] = None,
description: Annotated[str | None, "The description of the task. Defaults to None."] = None,
parent_task_id: Annotated[str | None, "The ID of the parent task. Defaults to None."] = None,
workspace_id: Annotated[
str | None, "The ID of the workspace to associate the task to. Defaults to None."
] = None,
project: Annotated[
str | None, "The ID or name of the project to associate the task to. Defaults to None."
] = None,
assignee_id: Annotated[
str | None,
"The ID of the user to assign the task to. "
"Defaults to 'me', which assigns the task to the current user.",
] = "me",
tags: Annotated[
list[str] | None,
"The tags to associate with the task. Multiple tags can be provided in the list. "
"Each item in the list can be a tag name (e.g. 'My Tag') or a tag ID (e.g. '1234567890'). "
"If a tag name does not exist, it will be automatically created with the new task. "
"Defaults to None (no tags associated).",
] = None,
) -> Annotated[
dict[str, Any],
"Creates a task in Asana",
]:
"""Creates a task in Asana
The task must be associated to at least one of the following: parent_task_id, project, or
workspace_id. If none of these are provided and the account has only one workspace, the task
will be associated to that workspace. If the account has multiple workspaces, an error will
be raised with a list of available workspaces.
"""
client = AsanaClient(context.get_auth_token_or_empty())
parent_task_id, project_id, workspace_id = await handle_new_task_associations(
context, parent_task_id, project, workspace_id
)
tag_ids = await handle_new_task_tags(context, tags, workspace_id)
validate_date_format("start_date", start_date)
validate_date_format("due_date", due_date)
task_data = {
"data": remove_none_values({
"name": name,
"due_on": due_date,
"start_on": start_date,
"notes": description,
"parent": parent_task_id,
"projects": [project_id] if project_id else None,
"workspace": workspace_id,
"assignee": assignee_id,
"tags": tag_ids,
}),
}
response = await client.post("tasks", json_data=task_data)
return {
"status": {"success": True, "message": "task successfully created"},
"task": response["data"],
}
@tool(requires_auth=Asana(scopes=["default"]))
async def attach_file_to_task(
context: ToolContext,
task_id: Annotated[str, "The ID of the task to attach the file to."],
file_name: Annotated[
str,
"The name of the file to attach with format extension. E.g. 'Image.png' or 'Report.pdf'.",
],
file_content_str: Annotated[
str | None,
"The string contents of the file to attach. Use this if the file is a text file. "
"Defaults to None.",
] = None,
file_content_base64: Annotated[
str | None,
"The base64-encoded binary contents of the file. "
"Use this for binary files like images or PDFs. Defaults to None.",
] = None,
file_content_url: Annotated[
str | None,
"The URL of the file to attach. Use this if the file is hosted on an external URL. "
"Defaults to None.",
] = None,
file_encoding: Annotated[
str,
"The encoding of the file to attach. Only used with file_content_str. Defaults to 'utf-8'.",
] = "utf-8",
) -> Annotated[dict[str, Any], "The task with the file attached."]:
"""Attaches a file to an Asana task
Provide exactly one of file_content_str, file_content_base64, or file_content_url, never more
than one.
- Use file_content_str for text files (will be encoded using file_encoding)
- Use file_content_base64 for binary files like images, PDFs, etc.
- Use file_content_url if the file is hosted on an external URL
"""
client = AsanaClient(context.get_auth_token_or_empty())
if sum([bool(file_content_str), bool(file_content_base64), bool(file_content_url)]) != 1:
raise ToolExecutionError(
"Provide exactly one of file_content_str, file_content_base64, or file_content_url"
)
data = {
"parent": task_id,
"name": file_name,
"resource_subtype": "asana",
}
if file_content_url is not None:
data["url"] = file_content_url
data["resource_subtype"] = "external"
file_content = None
elif file_content_str is not None:
try:
file_content = file_content_str.encode(file_encoding)
except LookupError as exc:
raise ToolExecutionError(f"Unknown encoding: {file_encoding}") from exc
except Exception as exc:
raise ToolExecutionError(
f"Failed to encode file content string with {file_encoding} encoding: {exc!s}"
) from exc
elif file_content_base64 is not None:
try:
file_content = base64.b64decode(file_content_base64)
except Exception as exc:
raise ToolExecutionError(f"Failed to decode base64 file content: {exc!s}") from exc
if file_content:
if file_name.lower().endswith(".pdf"):
files = {"file": (file_name, file_content, "application/pdf")}
else:
files = {"file": (file_name, file_content)} # type: ignore[dict-item]
else:
files = None
response = await client.post("/attachments", data=data, files=files)
return {
"status": {"success": True, "message": f"file successfully attached to task {task_id}"},
"response": response["data"],
}

View file

@ -0,0 +1,110 @@
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Asana
from arcade_asana.constants import TEAM_OPT_FIELDS
from arcade_asana.models import AsanaClient
from arcade_asana.utils import (
get_next_page,
get_unique_workspace_id_or_raise_error,
remove_none_values,
)
@tool(requires_auth=Asana(scopes=["default"]))
async def get_team_by_id(
context: ToolContext,
team_id: Annotated[str, "The ID of the Asana team to get"],
) -> Annotated[dict[str, Any], "Get an Asana team by its ID"]:
"""Get an Asana team by its ID"""
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
f"/teams/{team_id}",
params=remove_none_values({"opt_fields": TEAM_OPT_FIELDS}),
)
return {"team": response["data"]}
@tool(requires_auth=Asana(scopes=["default"]))
async def list_teams_the_current_user_is_a_member_of(
context: ToolContext,
workspace_id: Annotated[
str | None,
"The workspace ID to list teams from. Defaults to None. If no workspace ID is provided, "
"it will use the current user's workspace , if there's only one. If the user has multiple "
"workspaces, it will raise an error.",
] = None,
limit: Annotated[
int, "The maximum number of teams to return. Min is 1, max is 100. Defaults to 100."
] = 100,
next_page_token: Annotated[
str | None,
"The token to retrieve the next page of teams. Defaults to None (start from the first page "
"of teams)",
] = None,
) -> Annotated[
dict[str, Any],
"List teams in Asana that the current user is a member of",
]:
"""List teams in Asana that the current user is a member of"""
limit = max(1, min(100, limit))
workspace_id = workspace_id or await get_unique_workspace_id_or_raise_error(context)
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
"/users/me/teams",
params=remove_none_values({
"limit": limit,
"offset": next_page_token,
"opt_fields": TEAM_OPT_FIELDS,
"organization": workspace_id,
}),
)
return {
"teams": response["data"],
"count": len(response["data"]),
"next_page": get_next_page(response),
}
@tool(requires_auth=Asana(scopes=["default"]))
async def list_teams(
context: ToolContext,
workspace_id: Annotated[
str | None,
"The workspace ID to list teams from. Defaults to None. If no workspace ID is provided, "
"it will use the current user's workspace, if there's only one. If the user has multiple "
"workspaces, it will raise an error listing the available workspaces.",
] = None,
limit: Annotated[
int, "The maximum number of teams to return. Min is 1, max is 100. Defaults to 100."
] = 100,
next_page_token: Annotated[
str | None,
"The token to retrieve the next page of teams. Defaults to None (start from the first page "
"of teams)",
] = None,
) -> Annotated[dict[str, Any], "List teams in an Asana workspace"]:
"""List teams in an Asana workspace"""
limit = max(1, min(100, limit))
workspace_id = workspace_id or await get_unique_workspace_id_or_raise_error(context)
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
f"/workspaces/{workspace_id}/teams",
params=remove_none_values({
"limit": limit,
"offset": next_page_token,
"opt_fields": TEAM_OPT_FIELDS,
}),
)
return {
"teams": response["data"],
"count": len(response["data"]),
"next_page": get_next_page(response),
}

View file

@ -0,0 +1,69 @@
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Asana
from arcade_asana.constants import USER_OPT_FIELDS
from arcade_asana.models import AsanaClient
from arcade_asana.utils import (
get_next_page,
get_unique_workspace_id_or_raise_error,
remove_none_values,
)
@tool(requires_auth=Asana(scopes=["default"]))
async def list_users(
context: ToolContext,
workspace_id: Annotated[
str | None,
"The workspace ID to list users from. Defaults to None. If no workspace ID is provided, "
"it will use the current user's workspace , if there's only one. If the user has multiple "
"workspaces, it will raise an error.",
] = None,
limit: Annotated[
int,
"The maximum number of users to retrieve. Min is 1, max is 100. Defaults to 100.",
] = 100,
next_page_token: Annotated[
str | None,
"The token to retrieve the next page of users. Defaults to None (start from the first page "
"of users)",
] = None,
) -> Annotated[
dict[str, Any],
"List users in Asana",
]:
"""List users in Asana"""
limit = max(1, min(100, limit))
if not workspace_id:
workspace_id = await get_unique_workspace_id_or_raise_error(context)
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
"/users",
params=remove_none_values({
"workspace": workspace_id,
"limit": limit,
"offset": next_page_token,
"opt_fields": USER_OPT_FIELDS,
}),
)
return {
"users": response["data"],
"count": len(response["data"]),
"next_page": get_next_page(response),
}
@tool(requires_auth=Asana(scopes=["default"]))
async def get_user_by_id(
context: ToolContext,
user_id: Annotated[str, "The user ID to get."],
) -> Annotated[dict[str, Any], "The user information."]:
"""Get a user by ID"""
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(f"/users/{user_id}", params={"opt_fields": USER_OPT_FIELDS})
return {"user": response}

View file

@ -0,0 +1,54 @@
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Asana
from arcade_asana.constants import WORKSPACE_OPT_FIELDS
from arcade_asana.models import AsanaClient
from arcade_asana.utils import get_next_page, remove_none_values
@tool(requires_auth=Asana(scopes=["default"]))
async def get_workspace_by_id(
context: ToolContext,
workspace_id: Annotated[str, "The ID of the Asana workspace to get"],
) -> Annotated[dict[str, Any], "Get an Asana workspace by its ID"]:
"""Get an Asana workspace by its ID"""
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(f"/workspaces/{workspace_id}")
return {"workspace": response["data"]}
@tool(requires_auth=Asana(scopes=["default"]))
async def list_workspaces(
context: ToolContext,
limit: Annotated[
int, "The maximum number of workspaces to return. Min is 1, max is 100. Defaults to 100."
] = 100,
next_page_token: Annotated[
str | None,
"The token to retrieve the next page of workspaces. Defaults to None (start from the first "
"page of workspaces)",
] = None,
) -> Annotated[
dict[str, Any],
"List workspaces in Asana that are visible to the authenticated user",
]:
"""List workspaces in Asana that are visible to the authenticated user"""
limit = max(1, min(100, limit))
client = AsanaClient(context.get_auth_token_or_empty())
response = await client.get(
"/workspaces",
params=remove_none_values({
"limit": limit,
"offset": next_page_token,
"opt_fields": WORKSPACE_OPT_FIELDS,
}),
)
return {
"workspaces": response["data"],
"count": len(response["data"]),
"next_page": get_next_page(response),
}

View file

@ -0,0 +1,483 @@
import asyncio
import json
from collections.abc import Awaitable
from datetime import datetime
from typing import Any, Callable, TypeVar, cast
from arcade.sdk import ToolContext
from arcade.sdk.errors import RetryableToolError, ToolExecutionError
from arcade_asana.constants import (
ASANA_MAX_TIMEOUT_SECONDS,
MAX_PROJECTS_TO_SCAN_BY_NAME,
MAX_TAGS_TO_SCAN_BY_NAME,
TASK_OPT_FIELDS,
SortOrder,
TaskSortBy,
)
from arcade_asana.exceptions import PaginationTimeoutError
ToolResponse = TypeVar("ToolResponse", bound=dict[str, Any])
def remove_none_values(data: dict[str, Any]) -> dict[str, Any]:
return {k: v for k, v in data.items() if v is not None}
def validate_date_format(name: str, date_str: str | None) -> None:
if not date_str:
return
try:
datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
raise ToolExecutionError(f"Invalid {name} date format. Use the format YYYY-MM-DD.")
def build_task_search_query_params(
keywords: str | None,
completed: bool | None,
assignee_id: str | None,
project_id: str | None,
team_id: str | None,
tag_ids: list[str] | None,
due_on: str | None,
due_on_or_after: str | None,
due_on_or_before: str | None,
start_on: str | None,
start_on_or_after: str | None,
start_on_or_before: str | None,
limit: int,
sort_by: TaskSortBy,
sort_order: SortOrder,
) -> dict[str, Any]:
query_params: dict[str, Any] = {
"text": keywords,
"opt_fields": TASK_OPT_FIELDS,
"sort_by": sort_by.value,
"sort_ascending": sort_order == SortOrder.ASCENDING,
"limit": limit,
}
if completed is not None:
query_params["completed"] = completed
if assignee_id:
query_params["assignee.any"] = assignee_id
if project_id:
query_params["projects.any"] = project_id
if team_id:
query_params["team.any"] = team_id
if tag_ids:
query_params["tags.any"] = ",".join(tag_ids)
query_params = add_task_search_date_params(
query_params,
due_on,
due_on_or_after,
due_on_or_before,
start_on,
start_on_or_after,
start_on_or_before,
)
return query_params
def add_task_search_date_params(
query_params: dict[str, Any],
due_on: str | None,
due_on_or_after: str | None,
due_on_or_before: str | None,
start_on: str | None,
start_on_or_after: str | None,
start_on_or_before: str | None,
) -> dict[str, Any]:
"""
Builds the date-related query parameters for task search.
If a date is provided, it will be added to the query parameters. If not, it will be ignored.
"""
if due_on:
query_params["due_on"] = due_on
if due_on_or_after:
query_params["due_on.after"] = due_on_or_after
if due_on_or_before:
query_params["due_on.before"] = due_on_or_before
if start_on:
query_params["start_on"] = start_on
if start_on_or_after:
query_params["start_on.after"] = start_on_or_after
if start_on_or_before:
query_params["start_on.before"] = start_on_or_before
return query_params
async def handle_new_task_associations(
context: ToolContext,
parent_task_id: str | None,
project: str | None,
workspace_id: str | None,
) -> tuple[str | None, str | None, str | None]:
"""
Handles the association of a new task to a parent task, project, or workspace.
If no association is provided, it will try to find a workspace in the user's account.
In case the user has only one workspace, it will use that workspace.
Otherwise, it will raise an error.
If a workspace_id is not provided, but a parent_task_id or a project_id is provided, it will try
to find the workspace associated with the parent task or project.
In each of the two cases explained above, if a workspace is found, the function will return this
value, even if the workspace_id argument was None.
Returns a tuple of (parent_task_id, project_id, workspace_id).
"""
project_id, project_name = (None, None)
if project:
if project.isnumeric():
project_id = project
else:
project_name = project
if project_name:
project_data = await get_project_by_name_or_raise_error(context, project_name)
project_id = project_data["id"]
workspace_id = project_data["workspace"]["id"]
if not any([parent_task_id, project_id, workspace_id]):
workspace_id = await get_unique_workspace_id_or_raise_error(context)
if not workspace_id and parent_task_id:
from arcade_asana.tools.tasks import get_task_by_id # avoid circular imports
response = await get_task_by_id(context, parent_task_id)
workspace_id = response["task"]["workspace"]["id"]
return parent_task_id, project_id, workspace_id
async def get_project_by_name_or_raise_error(
context: ToolContext,
project_name: str,
max_items_to_scan: int = MAX_PROJECTS_TO_SCAN_BY_NAME,
) -> dict[str, Any]:
response = await find_projects_by_name(
context=context,
names=[project_name],
response_limit=100,
max_items_to_scan=max_items_to_scan,
return_projects_not_matched=True,
)
if not response["matches"]["projects"]:
projects = response["not_matched"]["projects"]
projects = [{"name": project["name"], "id": project["id"]} for project in projects]
message = (
f"Project with name '{project_name}' was not found. The search scans up to "
f"{max_items_to_scan} projects. If the user account has a larger number of projects, "
"it's possible that it exists, but the search didn't find it."
)
additional_prompt = f"Projects available: {json.dumps(projects)}"
raise RetryableToolError(
message=message,
developer_message=f"{message} {additional_prompt}",
additional_prompt_content=additional_prompt,
)
elif response["matches"]["count"] > 1:
projects = [
{"name": project["name"], "id": project["id"]}
for project in response["matches"]["projects"]
]
message = "Multiple projects found with the same name. Please provide a project ID instead."
additional_prompt = f"Projects matching the name '{project_name}': {json.dumps(projects)}"
raise RetryableToolError(
message=message,
developer_message=message,
additional_prompt_content=additional_prompt,
)
return cast(dict, response["matches"]["projects"][0])
async def handle_new_task_tags(
context: ToolContext,
tags: list[str] | None,
workspace_id: str | None,
) -> list[str] | None:
if not tags:
return None
tag_ids = []
tag_names = []
for tag in tags:
if tag.isnumeric():
tag_ids.append(tag)
else:
tag_names.append(tag)
if tag_names:
response = await find_tags_by_name(context, tag_names)
tag_ids.extend([tag["id"] for tag in response["matches"]["tags"]])
if response["not_found"]["tags"]:
from arcade_asana.tools.tags import create_tag # avoid circular imports
responses = await asyncio.gather(*[
create_tag(context, name=name, workspace_id=workspace_id)
for name in response["not_found"]["tags"]
])
tag_ids.extend([response["tag"]["id"] for response in responses])
return tag_ids
async def get_tag_ids(
context: ToolContext,
tags: list[str] | None,
max_items_to_scan: int = MAX_TAGS_TO_SCAN_BY_NAME,
) -> list[str] | None:
"""
Returns the IDs of the tags provided in the tags list, which can be either tag IDs or tag names.
If the tags list is empty, it returns None.
"""
tag_ids = []
tag_names = []
if tags:
for tag in tags:
if tag.isnumeric():
tag_ids.append(tag)
else:
tag_names.append(tag)
if tag_names:
searched_tags = await find_tags_by_name(
context=context,
names=tag_names,
max_items_to_scan=max_items_to_scan,
)
if searched_tags["not_found"]["tags"]:
tag_names_not_found = ", ".join(searched_tags["not_found"]["tags"])
raise ToolExecutionError(
f"Tags not found: {tag_names_not_found}. The search scans up to "
f"{max_items_to_scan} tags. If the user account has a larger number of tags, "
"it's possible that the tags exist, but the search didn't find them."
)
tag_ids.extend([tag["id"] for tag in searched_tags["matches"]["tags"]])
return tag_ids if tag_ids else None
async def paginate_tool_call(
tool: Callable[[ToolContext, Any], Awaitable[ToolResponse]],
context: ToolContext,
response_key: str,
max_items: int = 300,
timeout_seconds: int = ASANA_MAX_TIMEOUT_SECONDS,
next_page_token: str | None = None,
**tool_kwargs: Any,
) -> list[ToolResponse]:
results: list[ToolResponse] = []
async def paginate_loop() -> None:
nonlocal results
keep_paginating = True
if "limit" not in tool_kwargs:
tool_kwargs["limit"] = 100
while keep_paginating:
response = await tool(context, **tool_kwargs) # type: ignore[call-arg]
results.extend(response[response_key])
next_page = get_next_page(response)
next_page_token = next_page["next_page_token"]
if not next_page_token or len(results) >= max_items:
keep_paginating = False
else:
tool_kwargs["next_page_token"] = next_page_token
try:
await asyncio.wait_for(paginate_loop(), timeout=timeout_seconds)
except TimeoutError:
raise PaginationTimeoutError(timeout_seconds, tool.__tool_name__) # type: ignore[attr-defined]
else:
return results
async def get_unique_workspace_id_or_raise_error(context: ToolContext) -> str:
# Importing here to avoid circular imports
from arcade_asana.tools.workspaces import list_workspaces
workspaces = await list_workspaces(context)
if len(workspaces["workspaces"]) == 1:
return cast(str, workspaces["workspaces"][0]["id"])
else:
message = "User has multiple workspaces. Please provide a workspace ID."
additional_prompt = f"Workspaces available: {json.dumps(workspaces['workspaces'])}"
raise RetryableToolError(
message=message,
developer_message=message,
additional_prompt_content=additional_prompt,
)
async def find_projects_by_name(
context: ToolContext,
names: list[str],
team_id: list[str] | None = None,
response_limit: int = 100,
max_items_to_scan: int = MAX_PROJECTS_TO_SCAN_BY_NAME,
return_projects_not_matched: bool = False,
) -> dict[str, Any]:
"""Find projects by name using exact match
This function will paginate the list_projects tool call and return the projects that match
the names provided. If the names provided are not found, it will return the names searched for
that did not match any projects.
If return_projects_not_matched is True, it will also return the projects that were scanned,
but did not match any of the names searched for.
Args:
context: The tool context to use in the list_projects tool call.
names: The names of the projects to search for.
team_id: The ID of the team to search for projects in.
response_limit: The maximum number of matched projects to return.
max_items_to_scan: The maximum number of projects to scan while looking for matches.
return_projects_not_matched: Whether to return the projects that were scanned, but did not
match any of the names searched for.
"""
from arcade_asana.tools.projects import list_projects # avoid circular imports
names_lower = {name.casefold() for name in names}
projects = await paginate_tool_call(
tool=list_projects,
context=context,
response_key="projects",
max_items=max_items_to_scan,
timeout_seconds=15,
team_id=team_id,
)
matches: list[dict[str, Any]] = []
not_matched: list[str] = []
for project in projects:
project_name_lower = project["name"].casefold()
if len(matches) >= response_limit:
break
if project_name_lower in names_lower:
matches.append(project)
names_lower.remove(project_name_lower)
else:
not_matched.append(project)
not_found = [name for name in names if name.casefold() in names_lower]
response = {
"matches": {
"projects": matches,
"count": len(matches),
},
"not_found": {
"names": not_found,
"count": len(not_found),
},
}
if return_projects_not_matched:
response["not_matched"] = {
"projects": not_matched,
"count": len(not_matched),
}
return response
async def find_tags_by_name(
context: ToolContext,
names: list[str],
workspace_id: list[str] | None = None,
response_limit: int = 100,
max_items_to_scan: int = MAX_TAGS_TO_SCAN_BY_NAME,
return_tags_not_matched: bool = False,
) -> dict[str, Any]:
"""Find tags by name using exact match
This function will paginate the list_tags tool call and return the tags that match the names
provided. If the names provided are not found, it will return the names searched for that did
not match any tags.
If return_tags_not_matched is True, it will also return the tags that were scanned, but did not
match any of the names searched for.
Args:
context: The tool context to use in the list_tags tool call.
names: The names of the tags to search for.
workspace_id: The ID of the workspace to search for tags in.
response_limit: The maximum number of matched tags to return.
max_items_to_scan: The maximum number of tags to scan while looking for matches.
return_tags_not_matched: Whether to return the tags that were scanned, but did not match
any of the names searched for.
"""
from arcade_asana.tools.tags import list_tags # avoid circular imports
names_lower = {name.casefold() for name in names}
tags = await paginate_tool_call(
tool=list_tags,
context=context,
response_key="tags",
max_items=max_items_to_scan,
timeout_seconds=15,
workspace_id=workspace_id,
)
matches: list[dict[str, Any]] = []
not_matched: list[str] = []
for tag in tags:
tag_name_lower = tag["name"].casefold()
if len(matches) >= response_limit:
break
if tag_name_lower in names_lower:
matches.append(tag)
names_lower.remove(tag_name_lower)
else:
not_matched.append(tag["name"])
not_found = [name for name in names if name.casefold() in names_lower]
response = {
"matches": {
"tags": matches,
"count": len(matches),
},
"not_found": {
"tags": not_found,
"count": len(not_found),
},
}
if return_tags_not_matched:
response["not_matched"] = {
"tags": not_matched,
"count": len(not_matched),
}
return response
def get_next_page(response: dict[str, Any]) -> dict[str, Any]:
try:
token = response["next_page"]["offset"]
except (KeyError, TypeError):
token = None
return {"has_more_pages": token is not None, "next_page_token": token}

View file

@ -0,0 +1,16 @@
from unittest.mock import patch
import pytest
from arcade.sdk import ToolAuthorizationContext, ToolContext
@pytest.fixture
def mock_context():
mock_auth = ToolAuthorizationContext(token="fake-token") # noqa: S106
return ToolContext(authorization=mock_auth)
@pytest.fixture
def mock_httpx_client():
with patch("arcade_asana.models.httpx") as mock_httpx:
yield mock_httpx.AsyncClient().__aenter__.return_value

View file

@ -0,0 +1,96 @@
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_asana
from arcade_asana.tools import (
create_task,
)
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_asana)
@tool_eval()
def create_task_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="create task eval suite",
system_message="You are an AI assistant with access to Asana tools. Use them to help the user with their tasks.",
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Create task with name, description, start and due dates",
user_message="Create a task with the name 'Hello World' and the description 'This is a task description' starting on 2025-05-05 and due on 2025-05-11.",
expected_tool_calls=[
ExpectedToolCall(
func=create_task,
args={
"name": "Hello World",
"description": "This is a task description",
"start_date": "2025-05-05",
"due_date": "2025-05-11",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="name", weight=1 / 4),
BinaryCritic(critic_field="description", weight=1 / 4),
BinaryCritic(critic_field="start_date", weight=1 / 4),
BinaryCritic(critic_field="due_date", weight=1 / 4),
],
)
suite.add_case(
name="Create task with name and tag names",
user_message="Create a task with the name 'Hello World' and the tags 'My Tag' and 'My Other Tag'.",
expected_tool_calls=[
ExpectedToolCall(
func=create_task,
args={
"name": "Hello World",
"tags": ["My Tag", "My Other Tag"],
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="name", weight=0.5),
BinaryCritic(critic_field="tags", weight=0.5),
],
)
suite.add_case(
name="Create task with name and tag IDs",
user_message="Create a task with the name 'Hello World' and the tags '1234567890' and '1234567891'.",
expected_tool_calls=[
ExpectedToolCall(
func=create_task,
args={
"name": "Hello World",
"tags": ["1234567890", "1234567891"],
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="name", weight=0.5),
BinaryCritic(critic_field="tags", weight=0.5),
],
)
return suite

View file

@ -0,0 +1,197 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_asana
from arcade_asana.tools import get_project_by_id, list_projects
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_asana)
@tool_eval()
def list_projects_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="list projects eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="List projects",
user_message="List the projects in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=list_projects,
args={
"team_ids": None,
"limit": 100,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="team_ids", weight=0.4),
BinaryCritic(critic_field="limit", weight=0.3),
BinaryCritic(critic_field="offset", weight=0.3),
],
)
suite.add_case(
name="List projects filtering by teams",
user_message="List the projects in Asana for the team '1234567890'.",
expected_tool_calls=[
ExpectedToolCall(
func=list_projects,
args={
"team_ids": ["1234567890"],
"limit": 100,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="team_ids", weight=0.6),
BinaryCritic(critic_field="limit", weight=0.2),
BinaryCritic(critic_field="offset", weight=0.2),
],
)
suite.add_case(
name="List projects with limit",
user_message="List 10 projects in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=list_projects,
args={
"team_ids": None,
"limit": 10,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="team_ids", weight=0.2),
BinaryCritic(critic_field="limit", weight=0.6),
BinaryCritic(critic_field="offset", weight=0.2),
],
)
suite.add_case(
name="List projects with pagination",
user_message="Show me the next 2 projects.",
expected_tool_calls=[
ExpectedToolCall(
func=list_projects,
args={
"limit": 2,
"offset": "abc123",
"team_ids": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.45),
BinaryCritic(critic_field="offset", weight=0.45),
BinaryCritic(critic_field="team_ids", weight=0.1),
],
additional_messages=[
{"role": "user", "content": "Show me 2 projects in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListProjects",
"arguments": '{"limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"next_page": {
"has_more_results": True,
"next_page_token": "abc123",
},
"workspaces": [
{
"id": "1234567890",
"name": "Project Hello",
},
{
"id": "1234567891",
"name": "Project World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListProjects",
},
{
"role": "assistant",
"content": "Here are two projects in Asana:\n\n1. Project Hello\n2. Project World",
},
],
)
return suite
@tool_eval()
def get_project_by_id_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="get project by id eval suite",
system_message="You are an AI assistant with access to Asana tools. Use them to help the user with their tasks.",
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get project by id",
user_message="Get the project with id '1234567890' in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=get_project_by_id,
args={
"project_id": "1234567890",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="name", weight=0.7),
BinaryCritic(critic_field="description", weight=0.1),
BinaryCritic(critic_field="color", weight=0.1),
BinaryCritic(critic_field="workspace_id", weight=0.1),
],
)
return suite

View file

@ -0,0 +1,281 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_asana
from arcade_asana.tools import create_tag, list_tags
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_asana)
@tool_eval()
def list_tags_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="list tags eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="List tags",
user_message="List the tags in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=list_tags,
args={
"limit": 100,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="offset", weight=0.5),
],
)
suite.add_case(
name="List tags with limit",
user_message="List 10 tags in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=list_tags,
args={
"limit": 10,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.75),
BinaryCritic(critic_field="offset", weight=0.25),
],
)
suite.add_case(
name="List tags with pagination",
user_message="Show me the next 2 tags.",
expected_tool_calls=[
ExpectedToolCall(
func=list_tags,
args={
"limit": 2,
"offset": "abc123",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="offset", weight=0.5),
],
additional_messages=[
{"role": "user", "content": "Show me 2 tags in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListTags",
"arguments": '{"limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"next_page": {
"has_more_results": True,
"next_page_token": "abc123",
},
"workspaces": [
{
"id": "1234567890",
"name": "Tag Hello",
},
{
"id": "1234567891",
"name": "Tag World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListTags",
},
{
"role": "assistant",
"content": "Here are two tags in Asana:\n\n1. Tag Hello\n2. Tag World",
},
],
)
suite.add_case(
name="List tags with pagination changing the limit",
user_message="Show me the next 5 tags.",
expected_tool_calls=[
ExpectedToolCall(
func=list_tags,
args={
"limit": 5,
"offset": "abc123",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="offset", weight=0.5),
],
additional_messages=[
{"role": "user", "content": "Show me 5 tags in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListTags",
"arguments": '{"limit":5}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"next_page": {
"has_more_results": True,
"next_page_token": "abc123",
},
"workspaces": [
{
"id": "1234567890",
"name": "Tag Hello",
},
{
"id": "1234567891",
"name": "Tag World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListTags",
},
{
"role": "assistant",
"content": "Here are two tags in Asana:\n\n1. Tag Hello\n2. Tag World",
},
],
)
return suite
@tool_eval()
def create_tag_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="create tag eval suite",
system_message="You are an AI assistant with access to Asana tools. Use them to help the user with their tasks.",
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Create tag",
user_message="Create a 'Hello World' tag in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=create_tag,
args={
"name": "Hello World",
"description": None,
"color": None,
"workspace_id": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="name", weight=0.7),
BinaryCritic(critic_field="description", weight=0.1),
BinaryCritic(critic_field="color", weight=0.1),
BinaryCritic(critic_field="workspace_id", weight=0.1),
],
)
suite.add_case(
name="Create tag with description and color",
user_message="Create a dark orange tag 'Attention' in Asana with the description 'This is a tag for important tasks'.",
expected_tool_calls=[
ExpectedToolCall(
func=create_tag,
args={
"name": "Attention",
"description": "This is a tag for important tasks",
"color": "dark-orange",
"workspace_id": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="name", weight=0.3),
BinaryCritic(critic_field="description", weight=0.3),
BinaryCritic(critic_field="color", weight=0.3),
BinaryCritic(critic_field="workspace_id", weight=0.1),
],
)
suite.add_case(
name="Create tag in a specific workspace",
user_message="Create a dark orange tag 'Attention' in Asana with the description 'This is a tag for important tasks' in the workspace '1234567890'.",
expected_tool_calls=[
ExpectedToolCall(
func=create_tag,
args={
"name": "Attention",
"description": "This is a tag for important tasks",
"color": "dark-orange",
"workspace_id": "1234567890",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="name", weight=0.25),
BinaryCritic(critic_field="description", weight=0.25),
BinaryCritic(critic_field="color", weight=0.25),
BinaryCritic(critic_field="workspace_id", weight=0.25),
],
)
return suite

View file

@ -0,0 +1,442 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_asana
from arcade_asana.constants import SortOrder, TaskSortBy
from arcade_asana.tools import (
get_subtasks_from_a_task,
get_task_by_id,
get_tasks_without_id,
update_task,
)
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_asana)
@tool_eval()
def get_task_by_id_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="get task by id eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get task by id",
user_message="Get the task with id '1234567890' in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=get_task_by_id,
args={
"task_id": "1234567890",
"max_subtasks": 100,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="task_id", weight=0.8),
BinaryCritic(critic_field="max_subtasks", weight=0.2),
],
)
suite.add_case(
name="Get task by id with subtasks limit",
user_message="Get the task with id '1234567890' in Asana with up to 10 subtasks.",
expected_tool_calls=[
ExpectedToolCall(
func=get_task_by_id,
args={
"task_id": "1234567890",
"max_subtasks": 10,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="task_id", weight=0.5),
BinaryCritic(critic_field="max_subtasks", weight=0.5),
],
)
return suite
@tool_eval()
def get_subtasks_from_a_task_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="get subtasks from a task eval suite",
system_message="You are an AI assistant with access to Asana tools. Use them to help the user with their tasks.",
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get subtasks from a task",
user_message="Get the next 2 subtasks.",
expected_tool_calls=[
ExpectedToolCall(
func=get_subtasks_from_a_task,
args={
"task_id": "1234567890",
"limit": 2,
"offset": "abc123",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="task_id", weight=1 / 3),
BinaryCritic(critic_field="limit", weight=1 / 3),
BinaryCritic(critic_field="offset", weight=1 / 3),
],
additional_messages=[
{"role": "user", "content": "Get 2 subtasks from the task '1234567890'."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_GetSubtasksFromATask",
"arguments": '{"task_id":"1234567890","limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"next_page": {
"has_more_results": True,
"next_page_token": "abc123",
},
"subtasks": [
{
"id": "1234567890",
"name": "Subtask Hello",
},
{
"id": "1234567891",
"name": "Subtask World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_GetSubtasksFromATask",
},
{
"role": "assistant",
"content": "Here are two subtasks in Asana:\n\n1. Subtask Hello\n2. Subtask World",
},
],
)
return suite
@tool_eval()
def search_tasks_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="search tasks eval suite",
system_message="You are an AI assistant with access to Asana tools. Use them to help the user with their tasks.",
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Search tasks by name",
user_message="Search for the task 'Hello' in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=1),
],
)
suite.add_case(
name="Search tasks by name with custom sorting",
user_message="Search for the task 'Hello' in Asana sorting by likes in descending order.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"sort_by": TaskSortBy.LIKES,
"sort_order": SortOrder.DESCENDING,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=1 / 3),
BinaryCritic(critic_field="sort_by", weight=1 / 3),
BinaryCritic(critic_field="sort_order", weight=1 / 3),
],
)
suite.add_case(
name="Search tasks by name filtering by project ID",
user_message="Search for the task 'Hello' associated to the project with ID '1234567890'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"project_id": "1234567890",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=0.5),
BinaryCritic(critic_field="project_id", weight=0.5),
],
)
suite.add_case(
name="Search tasks by name filtering by project name",
user_message="Search for the task 'Hello' associated to the project named 'My Project'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"project_name": "My Project",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=0.5),
BinaryCritic(critic_field="project_name", weight=0.5),
],
)
suite.add_case(
name="Search tasks by name filtering by team ID",
user_message="Search for the task 'Hello' associated to the team with ID '1234567890'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"team_id": "1234567890",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=0.5),
BinaryCritic(critic_field="team_id", weight=0.5),
],
)
suite.add_case(
name="Search tasks by name filtering by tag IDs",
user_message="Search for the task 'Hello' associated to the tags with IDs '1234567890' and '1234567891'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"tags": ["1234567890", "1234567891"],
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=0.5),
BinaryCritic(critic_field="tag_ids", weight=0.5),
],
)
suite.add_case(
name="Search tasks by name filtering by tags names",
user_message="Search for the task 'Hello' associated to the tags 'My Tag' and 'My Other Tag'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"tags": ["My Tag", "My Other Tag"],
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=0.5),
BinaryCritic(critic_field="tag_names", weight=0.5),
],
)
suite.add_case(
name="Search tasks by name filtering by start and due dates",
user_message="Search for tasks 'Hello' that started on '2025-01-01' and are due on '2025-01-02'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"start_on": "2025-01-01",
"due_on": "2025-01-02",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=1 / 3),
BinaryCritic(critic_field="start_on", weight=1 / 3),
BinaryCritic(critic_field="due_on", weight=1 / 3),
],
)
suite.add_case(
name="Search tasks by name filtering by start and due dates",
user_message="Search for tasks 'Hello' that start on 2025-05-05 and are due on or before 2025-05-11.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"start_on": "2025-05-05",
"due_on_or_before": "2025-05-11",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=1 / 3),
BinaryCritic(critic_field="start_on", weight=1 / 3),
BinaryCritic(critic_field="due_on_or_before", weight=1 / 3),
],
)
suite.add_case(
name="Search not-completed tasks by name filtering by due date",
user_message="Search for tasks 'Hello' that are not completed and are due on or before 2025-05-11.",
expected_tool_calls=[
ExpectedToolCall(
func=get_tasks_without_id,
args={
"keywords": "Hello",
"due_on_or_before": "2025-05-11",
"completed": False,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="keywords", weight=1 / 3),
BinaryCritic(critic_field="due_on_or_before", weight=1 / 3),
BinaryCritic(critic_field="completed", weight=1 / 3),
],
)
return suite
@tool_eval()
def update_task_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="update task eval suite",
system_message="You are an AI assistant with access to Asana tools. Use them to help the user with their tasks.",
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Update task name",
user_message="Update the task '1234567890' with the name 'Hello World'.",
expected_tool_calls=[
ExpectedToolCall(
func=update_task,
args={"task_id": "1234567890", "name": "Hello World"},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="task_id", weight=0.5),
BinaryCritic(critic_field="name", weight=0.5),
],
)
suite.add_case(
name="Update task as completed",
user_message="Mark the task '1234567890' as completed.",
expected_tool_calls=[
ExpectedToolCall(
func=update_task,
args={"task_id": "1234567890", "completed": True},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="task_id", weight=0.5),
BinaryCritic(critic_field="completed", weight=0.5),
],
)
suite.add_case(
name="Update task with new parent task",
user_message="Update the task '1234567890' with the parent task '1234567891'.",
expected_tool_calls=[
ExpectedToolCall(
func=update_task,
args={"task_id": "1234567890", "parent_task_id": "1234567891"},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="task_id", weight=0.5),
BinaryCritic(critic_field="parent_task_id", weight=0.5),
],
)
suite.add_case(
name="Update task with new assignee",
user_message="Update the task '1234567890' with the assignee '1234567891'.",
expected_tool_calls=[
ExpectedToolCall(
func=update_task,
args={"task_id": "1234567890", "assignee_id": "1234567891"},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="task_id", weight=0.5),
BinaryCritic(critic_field="assignee_id", weight=0.5),
],
)
return suite

View file

@ -0,0 +1,269 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_asana
from arcade_asana.tools import get_team_by_id, list_teams_the_current_user_is_a_member_of
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_asana)
@tool_eval()
def get_team_by_id_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="get team by id eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get team by id",
user_message="Get the team with ID 1234567890.",
expected_tool_calls=[
ExpectedToolCall(
func=get_team_by_id,
args={
"team_id": "1234567890",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="team_id", weight=1),
],
)
return suite
@tool_eval()
def list_teams_the_current_user_is_a_member_of_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="list teams the current user is a member of eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="List teams the current user is a member of",
user_message="List the teams the current user is a member of.",
expected_tool_calls=[
ExpectedToolCall(
func=list_teams_the_current_user_is_a_member_of,
args={},
),
],
rubric=rubric,
critics=[],
)
suite.add_case(
name="List teams I am a member of",
user_message="List the teams I'm a member of.",
expected_tool_calls=[
ExpectedToolCall(
func=list_teams_the_current_user_is_a_member_of,
args={},
),
],
rubric=rubric,
critics=[],
)
suite.add_case(
name="List teams I am a member of",
user_message="What teams am I a member of in asana?",
expected_tool_calls=[
ExpectedToolCall(
func=list_teams_the_current_user_is_a_member_of,
args={},
),
],
rubric=rubric,
critics=[],
)
suite.add_case(
name="List teams the current user is a member of filtering by workspace",
user_message="List the teams the current user is a member of in the workspace 1234567890.",
expected_tool_calls=[
ExpectedToolCall(
func=list_teams_the_current_user_is_a_member_of,
args={
"workspace_ids": ["1234567890"],
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="workspace_ids", weight=1),
],
)
suite.add_case(
name="List up to 5 teams the current user is a member of filtering by workspace",
user_message="List up to 5 teams the current user is a member of in the workspace 1234567890.",
expected_tool_calls=[
ExpectedToolCall(
func=list_teams_the_current_user_is_a_member_of,
args={
"workspace_ids": ["1234567890"],
"limit": 5,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="workspace_ids", weight=0.5),
BinaryCritic(critic_field="limit", weight=0.5),
],
)
suite.add_case(
name="List teams with pagination",
user_message="Show me the next 2 teams.",
expected_tool_calls=[
ExpectedToolCall(
func=list_teams_the_current_user_is_a_member_of,
args={
"limit": 2,
"offset": "abc123",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="offset", weight=0.5),
BinaryCritic(critic_field="limit", weight=0.5),
],
additional_messages=[
{"role": "user", "content": "Show me 2 teams I'm a member of in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListTeamsTheCurrentUserIsAMemberOf",
"arguments": '{"limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 1,
"next_page": {
"has_more_results": True,
"next_page_token": "abc123",
},
"teams": [
{
"id": "1234567890",
"name": "Team Hello",
},
{
"id": "1234567891",
"name": "Team World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListTeamsTheCurrentUserIsAMemberOf",
},
{
"role": "assistant",
"content": "Here are two teams you're a member of in Asana:\n\n1. Team Hello\n2. Team World",
},
],
)
suite.add_case(
name="List teams with pagination changing the limit",
user_message="Show me the next 5 teams.",
expected_tool_calls=[
ExpectedToolCall(
func=list_teams_the_current_user_is_a_member_of,
args={
"limit": 5,
"offset": "abc123",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="offset", weight=0.5),
],
additional_messages=[
{"role": "user", "content": "Show me 2 teams I'm a member of in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListTeamsTheCurrentUserIsAMemberOf",
"arguments": '{"limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 1,
"next_page": {
"has_more_results": True,
"next_page_token": "abc123",
},
"teams": [
{
"id": "1234567890",
"name": "Team Hello",
},
{
"id": "1234567891",
"name": "Team World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListTeamsTheCurrentUserIsAMemberOf",
},
{
"role": "assistant",
"content": "Here are two teams you're a member of in Asana:\n\n1. Team Hello\n2. Team World",
},
],
)
return suite

View file

@ -0,0 +1,253 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_asana
from arcade_asana.tools import get_user_by_id, list_users
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_asana)
@tool_eval()
def get_user_by_id_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="get user by id eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get user by id",
user_message="Get the user with ID 1234567890.",
expected_tool_calls=[
ExpectedToolCall(
func=get_user_by_id,
args={
"user_id": "1234567890",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="user_id", weight=0.1),
],
)
return suite
@tool_eval()
def list_users_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="list users eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="List users",
user_message="List the users in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=list_users,
args={
"workspace_id": None,
"limit": 100,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="workspace_id", weight=0.3),
BinaryCritic(critic_field="limit", weight=0.3),
BinaryCritic(critic_field="offset", weight=0.4),
],
)
suite.add_case(
name="List users filtering by workspace",
user_message="List the users in the workspace 1234567890.",
expected_tool_calls=[
ExpectedToolCall(
func=list_users,
args={
"workspace_id": "1234567890",
"limit": 100,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="workspace_id", weight=0.8),
BinaryCritic(critic_field="limit", weight=0.1),
BinaryCritic(critic_field="offset", weight=0.1),
],
)
suite.add_case(
name="List users with limit",
user_message="List up to 5 users.",
expected_tool_calls=[
ExpectedToolCall(
func=list_users,
args={
"limit": 5,
"workspace_id": None,
"offset": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.8),
BinaryCritic(critic_field="workspace_id", weight=0.1),
BinaryCritic(critic_field="offset", weight=0.1),
],
)
suite.add_case(
name="List users with pagination",
user_message="Show me the next 2 users.",
expected_tool_calls=[
ExpectedToolCall(
func=list_users,
args={
"workspace_id": None,
"limit": 2,
"offset": 2,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.45),
BinaryCritic(critic_field="offset", weight=0.45),
BinaryCritic(critic_field="workspace_id", weight=0.1),
],
additional_messages=[
{"role": "user", "content": "Show me 2 users in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListUsers",
"arguments": '{"limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"users": [
{
"id": "1234567890",
"name": "User Hello",
},
{
"id": "1234567891",
"name": "User World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListUsers",
},
{
"role": "assistant",
"content": "Here are two users in Asana:\n\n1. User Hello\n2. User World",
},
],
)
suite.add_case(
name="List users with pagination changing the limit",
user_message="Show me the next 5 users.",
expected_tool_calls=[
ExpectedToolCall(
func=list_users,
args={
"workspace_id": None,
"limit": 5,
"offset": 2,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.45),
BinaryCritic(critic_field="offset", weight=0.45),
BinaryCritic(critic_field="workspace_id", weight=0.1),
],
additional_messages=[
{"role": "user", "content": "Show me 2 users in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListUsers",
"arguments": '{"limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"users": [
{
"id": "1234567890",
"name": "User Hello",
},
{
"id": "1234567891",
"name": "User World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListUsers",
},
{
"role": "assistant",
"content": "Here are two users in Asana:\n\n1. User Hello\n2. User World",
},
],
)
return suite

View file

@ -0,0 +1,177 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_asana
from arcade_asana.tools import list_workspaces
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_asana)
@tool_eval()
def list_workspaces_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="list workspaces eval suite",
system_message=(
"You are an AI assistant with access to Asana tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="List workspaces",
user_message="List the workspaces in Asana.",
expected_tool_calls=[
ExpectedToolCall(
func=list_workspaces,
args={
"limit": 100,
"offset": 0,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="offset", weight=0.5),
],
)
suite.add_case(
name="List workspaces with pagination",
user_message="Show me the next 2 workspaces.",
expected_tool_calls=[
ExpectedToolCall(
func=list_workspaces,
args={
"limit": 2,
"offset": 2,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="offset", weight=0.5),
],
additional_messages=[
{"role": "user", "content": "Show me 2 workspaces in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListWorkspaces",
"arguments": '{"limit":2}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"workspaces": [
{
"id": "1234567890",
"name": "Workspace Hello",
},
{
"id": "1234567891",
"name": "Workspace World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListWorkspaces",
},
{
"role": "assistant",
"content": "Here are two workspaces in Asana:\n\n1. Workspace Hello\n2. Workspace World",
},
],
)
suite.add_case(
name="List workspaces with pagination changing the limit",
user_message="Show me the next 5 workspaces.",
expected_tool_calls=[
ExpectedToolCall(
func=list_workspaces,
args={
"limit": 5,
"offset": "abc123",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="offset", weight=0.5),
],
additional_messages=[
{"role": "user", "content": "Show me 5 workspaces in Asana."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Asana_ListWorkspaces",
"arguments": '{"limit":5}',
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"count": 2,
"next_page": {
"has_more_results": True,
"next_page_token": "abc123",
},
"workspaces": [
{
"id": "1234567890",
"name": "Workspace Hello",
},
{
"id": "1234567891",
"name": "Workspace World",
},
],
}),
"tool_call_id": "call_1",
"name": "Asana_ListWorkspaces",
},
{
"role": "assistant",
"content": "Here are two workspaces in Asana:\n\n1. Workspace Hello\n2. Workspace World",
},
],
)
return suite

View file

@ -0,0 +1,42 @@
[tool.poetry]
name = "arcade_asana"
version = "0.1.0"
description = "Arcade tools designed for LLMs to interact with Asana"
authors = ["Arcade <dev@arcade.dev>"]
[tool.poetry.dependencies]
python = "^3.10"
arcade-ai = ">=1.4.0,<2.0"
httpx = "^0.27.2"
[tool.poetry.dev-dependencies]
pytest = "^8.3.0"
pytest-cov = "^4.0.0"
pytest-asyncio = "^0.24.0"
pytest-mock = "^3.11.1"
mypy = "^1.5.1"
pre-commit = "^3.4.0"
tox = "^4.11.1"
ruff = "^0.7.4"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.mypy]
files = ["arcade_asana/**/*.py"]
python_version = "3.10"
disallow_untyped_defs = "True"
disallow_any_unimported = "True"
no_implicit_optional = "True"
check_untyped_defs = "True"
warn_return_any = "True"
warn_unused_ignores = "True"
show_error_codes = "True"
ignore_missing_imports = "True"
[tool.pytest.ini_options]
testpaths = ["tests"]
[tool.coverage.report]
skip_empty = true

View file

@ -0,0 +1,78 @@
from unittest.mock import patch
import pytest
from arcade.sdk.errors import RetryableToolError
from arcade_asana.utils import (
get_project_by_name_or_raise_error,
get_tag_ids,
get_unique_workspace_id_or_raise_error,
)
@pytest.mark.asyncio
@patch("arcade_asana.utils.find_tags_by_name")
async def test_get_tag_ids(mock_find_tags_by_name, mock_context):
assert await get_tag_ids(mock_context, None) is None
assert await get_tag_ids(mock_context, ["1234567890", "1234567891"]) == [
"1234567890",
"1234567891",
]
mock_find_tags_by_name.return_value = {
"matches": {
"tags": [
{"id": "1234567890", "name": "My Tag"},
{"id": "1234567891", "name": "My Other Tag"},
]
},
"not_found": {"tags": []},
}
assert await get_tag_ids(mock_context, ["My Tag", "My Other Tag"]) == [
"1234567890",
"1234567891",
]
@pytest.mark.asyncio
@patch("arcade_asana.tools.workspaces.list_workspaces")
async def test_get_unique_workspace_id_or_raise_error(mock_list_workspaces, mock_context):
mock_list_workspaces.return_value = {
"workspaces": [
{"id": "1234567890", "name": "My Workspace"},
]
}
assert await get_unique_workspace_id_or_raise_error(mock_context) == "1234567890"
mock_list_workspaces.return_value = {
"workspaces": [
{"id": "1234567890", "name": "My Workspace"},
{"id": "1234567891", "name": "My Other Workspace"},
]
}
with pytest.raises(RetryableToolError) as exc_info:
await get_unique_workspace_id_or_raise_error(mock_context)
assert "My Other Workspace" in exc_info.value.additional_prompt_content
@pytest.mark.asyncio
@patch("arcade_asana.utils.find_projects_by_name")
async def test_get_project_by_name_or_raise_error(mock_find_projects_by_name, mock_context):
project1 = {"id": "1234567890", "name": "My Project"}
mock_find_projects_by_name.return_value = {
"matches": {"projects": [project1], "count": 1},
"not_matched": {"projects": [], "count": 0},
}
assert await get_project_by_name_or_raise_error(mock_context, project1["name"]) == project1
mock_find_projects_by_name.return_value = {
"matches": {"projects": [], "count": 0},
"not_matched": {"projects": [project1], "count": 1},
}
with pytest.raises(RetryableToolError) as exc_info:
await get_project_by_name_or_raise_error(mock_context, "Inexistent Project")
assert project1["name"] in exc_info.value.additional_prompt_content