Jira Toolkit (#407)

Unusually big PR, in large part due to Jira's data model complexities.
This commit is contained in:
Renato Byrro 2025-06-11 12:06:33 -03:00 committed by GitHub
parent 3b50c0bb6e
commit a538f4bf81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 6205 additions and 0 deletions

View file

@ -5,6 +5,7 @@ arcade-dropbox
arcade-github
arcade-google
arcade-hubspot
arcade-jira
arcade-linkedin
arcade-math
arcade-microsoft

View file

@ -0,0 +1,18 @@
files: ^./
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: "v4.4.0"
hooks:
- id: check-case-conflict
- id: check-merge-conflict
- id: check-toml
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.7
hooks:
- id: ruff
args: [--fix]
- id: ruff-format

46
toolkits/jira/.ruff.toml Normal file
View file

@ -0,0 +1,46 @@
target-version = "py39"
line-length = 100
fix = true
[lint]
select = [
# flake8-2020
"YTT",
# flake8-bandit
"S",
# flake8-bugbear
"B",
# flake8-builtins
"A",
# flake8-comprehensions
"C4",
# flake8-debugger
"T10",
# flake8-simplify
"SIM",
# isort
"I",
# mccabe
"C90",
# pycodestyle
"E", "W",
# pyflakes
"F",
# pygrep-hooks
"PGH",
# pyupgrade
"UP",
# ruff
"RUF",
# tryceratops
"TRY",
]
[lint.per-file-ignores]
"*" = ["TRY003", "B904"]
"**/tests/*" = ["S101", "E501"]
"**/evals/*" = ["S101", "E501"]
[format]
preview = true
skip-magic-trailing-comma = false

21
toolkits/jira/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Arcade
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

53
toolkits/jira/Makefile Normal file
View file

@ -0,0 +1,53 @@
.PHONY: help
help:
@echo "🛠️ dropbox Commands:\n"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
.PHONY: install
install: ## Install the poetry environment and install the pre-commit hooks
@echo "📦 Checking if Poetry is installed"
@if ! command -v poetry &> /dev/null; then \
echo "📦 Installing Poetry with pip"; \
pip install poetry==1.8.5; \
else \
echo "📦 Poetry is already installed"; \
fi
@echo "🚀 Installing package in development mode with all extras"
poetry install --all-extras
.PHONY: build
build: clean-build ## Build wheel file using poetry
@echo "🚀 Creating wheel file"
poetry build
.PHONY: clean-build
clean-build: ## clean build artifacts
@echo "🗑️ Cleaning dist directory"
rm -rf dist
.PHONY: test
test: ## Test the code with pytest
@echo "🚀 Testing code: Running pytest"
@poetry run pytest -W ignore -v --cov --cov-config=pyproject.toml --cov-report=xml
.PHONY: coverage
coverage: ## Generate coverage report
@echo "coverage report"
coverage report
@echo "Generating coverage report"
coverage html
.PHONY: bump-version
bump-version: ## Bump the version in the pyproject.toml file
@echo "🚀 Bumping version in pyproject.toml"
poetry version patch
.PHONY: check
check: ## Run code quality tools.
@echo "🚀 Checking Poetry lock file consistency with 'pyproject.toml': Running poetry check"
@poetry check
@echo "🚀 Linting code: Running pre-commit"
@poetry run pre-commit run -a
@echo "🚀 Static type checking: Running mypy"
@poetry run mypy --config-file=pyproject.toml

View file

View file

@ -0,0 +1,105 @@
import asyncio
from collections import OrderedDict
from threading import Lock
from typing import Generic, TypeVar
from arcade_jira.constants import JIRA_CACHE_MAX_ITEMS
T = TypeVar("T")
class LRUCache(Generic[T]):
def __init__(self, max_size: int):
self.cache: OrderedDict[str, T] = OrderedDict()
self.max_size = max_size
self.thread_lock = Lock()
self.async_lock = asyncio.Lock()
# Thread-safe synchronous methods
def get(self, key: str) -> T | None:
with self.thread_lock:
if key not in self.cache:
return None
value = self.cache.pop(key)
self.cache[key] = value
return value
def set(self, key: str, value: T) -> None:
with self.thread_lock:
if key in self.cache:
self.cache.pop(key)
elif len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = value
# Async-safe methods
async def async_get(self, key: str) -> T | None:
async with self.async_lock:
if key not in self.cache:
return None
value = self.cache.pop(key)
self.cache[key] = value
return value
async def async_set(self, key: str, value: T) -> None:
async with self.async_lock:
if key in self.cache:
self.cache.pop(key)
elif len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = value
CLOUD_ID_CACHE = LRUCache[str](max_size=JIRA_CACHE_MAX_ITEMS)
CLOUD_NAME_CACHE = LRUCache[str](max_size=JIRA_CACHE_MAX_ITEMS)
CLIENT_SEMAPHORE_CACHE = LRUCache[asyncio.Semaphore](max_size=JIRA_CACHE_MAX_ITEMS)
def get_cloud_id(auth_token: str) -> str | None:
return CLOUD_ID_CACHE.get(auth_token)
def get_cloud_name(auth_token: str) -> str | None:
return CLOUD_NAME_CACHE.get(auth_token)
def set_cloud_id(auth_token: str, cloud_id: str) -> None:
CLOUD_ID_CACHE.set(auth_token, cloud_id)
def set_cloud_name(auth_token: str, cloud_name: str) -> None:
CLOUD_NAME_CACHE.set(auth_token, cloud_name)
def get_jira_client_semaphore(auth_token: str) -> asyncio.Semaphore | None:
return CLIENT_SEMAPHORE_CACHE.get(auth_token)
def set_jira_client_semaphore(auth_token: str, semaphore: asyncio.Semaphore) -> None:
CLIENT_SEMAPHORE_CACHE.set(auth_token, semaphore)
async def async_get_cloud_id(auth_token: str) -> str | None:
return await CLOUD_ID_CACHE.async_get(auth_token)
async def async_get_cloud_name(auth_token: str) -> str | None:
return await CLOUD_NAME_CACHE.async_get(auth_token)
async def async_set_cloud_id(auth_token: str, cloud_id: str) -> None:
await CLOUD_ID_CACHE.async_set(auth_token, cloud_id)
async def async_set_cloud_name(auth_token: str, cloud_name: str) -> None:
await CLOUD_NAME_CACHE.async_set(auth_token, cloud_name)
async def async_get_jira_client_semaphore(auth_token: str) -> asyncio.Semaphore | None:
return await CLIENT_SEMAPHORE_CACHE.async_get(auth_token)
async def async_set_jira_client_semaphore(auth_token: str, semaphore: asyncio.Semaphore) -> None:
await CLIENT_SEMAPHORE_CACHE.async_set(auth_token, semaphore)

View file

@ -0,0 +1,226 @@
import asyncio
import json
import json.decoder
from dataclasses import dataclass
from typing import Any, Optional, cast
import httpx
import arcade_jira.cache as cache
from arcade_jira.constants import JIRA_API_VERSION, JIRA_BASE_URL, JIRA_MAX_CONCURRENT_REQUESTS
from arcade_jira.exceptions import JiraToolExecutionError, NotFoundError
@dataclass
class JiraClient:
auth_token: str
base_url: str = JIRA_BASE_URL
api_version: str = JIRA_API_VERSION
max_concurrent_requests: int = JIRA_MAX_CONCURRENT_REQUESTS
_semaphore: asyncio.Semaphore | None = None
_cloud_id: str | None = None
def __post_init__(self) -> None:
if not self._semaphore:
cached_semaphore = cache.get_jira_client_semaphore(self.auth_token)
if cached_semaphore:
self._semaphore = cached_semaphore
else:
self._semaphore = asyncio.Semaphore(self.max_concurrent_requests)
cache.set_jira_client_semaphore(self.auth_token, self._semaphore)
self.base_url = self.base_url.rstrip("/")
self.api_version = self.api_version.strip("/")
async def get_cloud_id(self) -> str:
if self._cloud_id is None:
if (cloud_id := await cache.async_get_cloud_id(self.auth_token)) is not None:
self._cloud_id = cloud_id
else:
cloud = await self._get_cloud_data_from_available_resources()
self._cloud_id = cloud["id"]
await cache.async_set_cloud_id(self.auth_token, cloud["id"])
await cache.async_set_cloud_name(self.auth_token, cloud["name"])
return self._cloud_id
async def _build_url(self, endpoint: str) -> str:
cloud_id = await self.get_cloud_id()
return f"{self.base_url}/{cloud_id}/rest/api/{self.api_version}/{endpoint.lstrip('/')}"
async def _get_cloud_data_from_available_resources(self) -> dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.atlassian.com/oauth/token/accessible-resources",
headers={"Authorization": f"Bearer {self.auth_token}"},
)
data = response.json()
if len(data) == 0:
raise JiraToolExecutionError(
"No cloud ID returned by Atlassian, cannot make API calls"
)
if len(data) > 1:
cloud_ids_found = json.dumps([
{
"id": item["id"],
"name": item["name"],
"url": item["url"],
}
for item in data
])
raise JiraToolExecutionError(
f"Multiple cloud IDs returned by Atlassian: {cloud_ids_found}. "
"Cannot resolve which one to use."
)
return cast(dict[str, Any], data[0])
def _build_error_messages(self, response: httpx.Response) -> tuple[str, str | None]:
try:
data = response.json()
developer_message = None
if "errorMessages" in data:
if len(data["errorMessages"]) == 1:
error_message = cast(str, data["errorMessages"][0])
elif "errors" in data:
error_message = json.dumps(data["errors"])
else:
error_message = "Unknown error"
elif "message" in data:
error_message = cast(str, data["message"])
else:
error_message = json.dumps(data)
except Exception as e:
error_message = "Failed to parse Jira error response"
developer_message = (
f"Failed to parse Jira error response: {type(e).__name__}: {e!s}. "
f"API Response: {response.text}"
)
return error_message, developer_message
def _raise_for_status(self, response: httpx.Response) -> None:
if response.status_code < 300:
return
error_message, developer_message = self._build_error_messages(response)
if response.status_code == 404:
raise NotFoundError(error_message, developer_message)
raise JiraToolExecutionError(error_message, developer_message)
def _set_request_body(self, kwargs: dict, data: dict | None, json_data: dict | None) -> dict:
if data and json_data:
raise ValueError("Cannot provide both data and json_data")
if data:
kwargs["data"] = data
elif json_data:
kwargs["json"] = json_data
return kwargs
def _format_response_dict(self, response: httpx.Response) -> dict:
try:
return cast(dict, response.json())
except (UnicodeDecodeError, json.decoder.JSONDecodeError):
return {"text": response.text}
async def get(
self,
endpoint: str,
params: Optional[dict] = None,
headers: Optional[dict] = None,
) -> dict:
default_headers = {
"Authorization": f"Bearer {self.auth_token}",
"Accept": "application/json",
}
headers = {**default_headers, **(headers or {})}
kwargs = {
"url": await self._build_url(endpoint),
"headers": headers,
}
if params:
kwargs["params"] = params
async with self._semaphore, httpx.AsyncClient() as client: # type: ignore[union-attr]
response = await client.get(**kwargs) # type: ignore[arg-type]
self._raise_for_status(response)
return self._format_response_dict(response)
async def post(
self,
endpoint: str,
data: Optional[dict] = None,
json_data: Optional[dict] = None,
files: Optional[dict] = None,
headers: Optional[dict] = None,
) -> dict:
default_headers = {
"Authorization": f"Bearer {self.auth_token}",
"Accept": "application/json",
}
if files is None and json_data is not None:
default_headers["Content-Type"] = "application/json"
headers = {**default_headers, **(headers or {})}
kwargs = {
"url": await self._build_url(endpoint),
"headers": headers,
}
if files is not None:
kwargs["files"] = files
if data is not None:
kwargs["data"] = data
else:
kwargs = self._set_request_body(kwargs, data, json_data)
async with self._semaphore, httpx.AsyncClient() as client: # type: ignore[union-attr]
response = await client.post(**kwargs) # type: ignore[arg-type]
self._raise_for_status(response)
return self._format_response_dict(response)
async def put(
self,
endpoint: str,
data: Optional[dict] = None,
json_data: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
) -> dict:
headers = headers or {}
headers["Authorization"] = f"Bearer {self.auth_token}"
headers["Content-Type"] = "application/json"
headers["Accept"] = "application/json"
kwargs = {
"url": await self._build_url(endpoint),
"headers": headers,
}
kwargs = self._set_request_body(kwargs, data, json_data)
if params:
kwargs["params"] = params
async with self._semaphore, httpx.AsyncClient() as client: # type: ignore[union-attr]
response = await client.put(**kwargs) # type: ignore[arg-type]
self._raise_for_status(response)
return self._format_response_dict(response)

View file

@ -0,0 +1,98 @@
import os
from enum import Enum
JIRA_BASE_URL = "https://api.atlassian.com/ex/jira"
JIRA_API_VERSION = "3"
try:
JIRA_MAX_CONCURRENT_REQUESTS = max(1, int(os.getenv("JIRA_MAX_CONCURRENT_REQUESTS", 3)))
except Exception:
JIRA_MAX_CONCURRENT_REQUESTS = 3
try:
JIRA_API_REQUEST_TIMEOUT = int(os.getenv("JIRA_API_REQUEST_TIMEOUT", 30))
except Exception:
JIRA_API_REQUEST_TIMEOUT = 30
try:
JIRA_CACHE_MAX_ITEMS = max(1, int(os.getenv("JIRA_CACHE_MAX_ITEMS", 5000)))
except Exception:
JIRA_CACHE_MAX_ITEMS = 5000
STOP_WORDS = [
"a",
"an",
"and",
"are",
"as",
"at",
"be",
"but",
"by",
"for",
"if",
"in",
"into",
"is",
"it",
"no",
"not",
"of",
"on",
"or",
"such",
"that",
"the",
"their",
"then",
"there",
"these",
"they",
"this",
"to",
"was",
"will",
"with",
"+",
"-",
"&",
"|",
"!",
"(",
")",
"{",
"}",
"[",
"]",
"^",
"~",
"*",
"?",
"\\",
":",
]
class IssueCommentOrderBy(Enum):
CREATED_DATE_ASCENDING = "created_date_ascending"
CREATED_DATE_DESCENDING = "created_date_descending"
def to_api_value(self) -> str:
_map: dict[IssueCommentOrderBy, str] = {
IssueCommentOrderBy.CREATED_DATE_ASCENDING: "+created",
IssueCommentOrderBy.CREATED_DATE_DESCENDING: "-created",
}
return _map[self]
class PrioritySchemeOrderBy(Enum):
NAME_ASCENDING = "name ascending"
NAME_DESCENDING = "name descending"
def to_api_value(self) -> str:
_map: dict[PrioritySchemeOrderBy, str] = {
PrioritySchemeOrderBy.NAME_ASCENDING: "+name",
PrioritySchemeOrderBy.NAME_DESCENDING: "-name",
}
return _map[self]

View file

@ -0,0 +1,91 @@
from dataclasses import dataclass
from typing import Any
from arcade.sdk.eval.critic import BinaryCritic
@dataclass
class HasSubstringCritic(BinaryCritic):
"""A critic for checking whether the argument value contains a substring."""
def evaluate(self, expected: Any, actual: Any) -> dict[str, float | bool]:
"""
Evaluates whether the actual value contains the expected value.
Args:
expected: The expected value.
actual: The actual value to compare, cast to the type of expected.
Returns:
dict: A dictionary containing the match status and score.
"""
if not isinstance(actual, str) or not isinstance(expected, str):
return {"match": False, "score": 0.0}
try:
actual_casted = self.cast_actual(expected, actual)
except TypeError:
actual_casted = actual
match = expected in actual_casted
return {"match": match, "score": self.weight if match else 0.0}
@dataclass
class CaseInsensitiveBinaryCritic(BinaryCritic):
"""A critic for checking whether actual and expected values are the same, case insensitive."""
def evaluate(self, expected: Any, actual: Any) -> dict[str, float | bool]:
"""
Evaluates whether the actual value is the same as the expected value, case insensitive.
Args:
expected: The expected value.
actual: The actual value to compare, cast to the type of expected.
Returns:
dict: A dictionary containing the match status and score.
"""
if not isinstance(actual, str) or not isinstance(expected, str):
return {"match": False, "score": 0.0}
try:
actual_casted = self.cast_actual(expected, actual)
except TypeError:
actual_casted = actual
match = expected.casefold() in actual_casted.casefold()
return {"match": match, "score": self.weight if match else 0.0}
@dataclass
class CaseInsensitiveListOfStringsBinaryCritic(BinaryCritic):
"""Checks that all strings match in Actual and Expected list of strings, case insensitive"""
def evaluate(self, expected: Any, actual: Any) -> dict[str, float | bool]:
"""
Checks that all strings match in Actual and Expected list of strings, case insensitive
Args:
expected: The expected value.
actual: The actual value to compare, cast to the type of expected.
Returns:
dict: A dictionary containing the match status and score.
"""
if not isinstance(actual, list) or not isinstance(expected, list):
return {"match": False, "score": 0.0}
all_actual_str = all(isinstance(item, str) for item in actual)
all_expected_str = all(isinstance(item, str) for item in expected)
if not all_actual_str or not all_expected_str:
return {"match": False, "score": 0.0}
actual_folded = [item.casefold() for item in actual]
expected_folded = [item.casefold() for item in expected]
match = len(actual) == len(expected) and all(
item in actual_folded for item in expected_folded
)
return {"match": match, "score": self.weight if match else 0.0}

View file

@ -0,0 +1,13 @@
from arcade.sdk.errors import ToolExecutionError
class JiraToolExecutionError(ToolExecutionError):
pass
class NotFoundError(JiraToolExecutionError):
pass
class MultipleItemsFoundError(JiraToolExecutionError):
pass

View file

@ -0,0 +1,80 @@
from arcade_jira.tools.attachments import (
attach_file_to_issue,
download_attachment,
get_attachment_metadata,
list_issue_attachments_metadata,
)
from arcade_jira.tools.comments import (
add_comment_to_issue,
get_comment_by_id,
get_issue_comments,
)
from arcade_jira.tools.issues import (
add_labels_to_issue,
create_issue,
get_issue_by_id,
get_issue_type_by_id,
get_issues_without_id,
list_issue_types_by_project,
remove_labels_from_issue,
search_issues_with_jql,
update_issue,
)
from arcade_jira.tools.labels import list_labels
from arcade_jira.tools.priorities import (
get_priority_by_id,
list_priorities_associated_with_a_priority_scheme,
list_priorities_available_to_a_project,
list_priorities_available_to_an_issue,
list_priority_schemes,
list_projects_associated_with_a_priority_scheme,
)
from arcade_jira.tools.projects import get_project_by_id, search_projects
from arcade_jira.tools.transitions import (
get_transition_by_status_name,
get_transitions_available_for_issue,
transition_issue_to_new_status,
)
from arcade_jira.tools.users import get_user_by_id, get_users_without_id, list_users
__all__ = [
# Attachments tools
"attach_file_to_issue",
"download_attachment",
"get_attachment_metadata",
"list_issue_attachments_metadata",
# Comments tools
"add_comment_to_issue",
"get_comment_by_id",
"get_issue_comments",
# Issues tools
"add_labels_to_issue",
"create_issue",
"get_issue_by_id",
"get_issue_type_by_id",
"get_issues_without_id",
"list_issue_types_by_project",
"remove_labels_from_issue",
"search_issues_with_jql",
"update_issue",
# Labels tools
"list_labels",
# Priorities tools
"get_priority_by_id",
"list_priority_schemes",
"list_priorities_associated_with_a_priority_scheme",
"list_projects_associated_with_a_priority_scheme",
"list_priorities_available_to_a_project",
"list_priorities_available_to_an_issue",
# Projects tools
"get_project_by_id",
"search_projects",
# Transitions tools
"get_transition_by_status_name",
"get_transitions_available_for_issue",
"transition_issue_to_new_status",
# Users tools
"get_user_by_id",
"get_users_without_id",
"list_users",
]

View file

@ -0,0 +1,147 @@
from typing import Annotated, Any, cast
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
from arcade.sdk.errors import ToolExecutionError
import arcade_jira.cache as cache
from arcade_jira.client import JiraClient
from arcade_jira.exceptions import NotFoundError
from arcade_jira.utils import build_file_data, clean_attachment_dict
@tool(requires_auth=Atlassian(scopes=["write:jira-work"]))
async def attach_file_to_issue(
context: ToolContext,
issue: Annotated[str, "The issue ID or key to add the attachment to"],
filename: Annotated[
str,
"The name of the file to add as an attachment. The filename should contain the "
"file extension (e.g. 'test.txt', 'report.pdf'), but it is not mandatory.",
],
file_content_str: Annotated[
str | None,
"The string content of the file to attach. Use this if the file is a text file. "
"Defaults to None.",
] = None,
file_content_base64: Annotated[
str | None,
"The base64-encoded binary contents of the file. "
"Use this for binary files like images or PDFs. Defaults to None.",
] = None,
file_encoding: Annotated[
str,
"The encoding of the file to attach. Only used with file_content_str. Defaults to 'utf-8'.",
] = "utf-8",
file_type: Annotated[
str | None,
"The type of the file to attach. E.g. 'application/pdf', 'text', 'image/png'. "
"If not provided, the tool will try to infer the type from the filename. "
"If the filename is not recognized, it will attach the file without specifying a type. "
"Defaults to None (infer from filename or attach without type).",
] = None,
) -> Annotated[dict[str, Any], "Metadata about the attachment"]:
"""Add an attachment to an issue.
Must provide exactly one of file_content_str or file_content_base64.
"""
file_contents = [file_content_str, file_content_base64]
if not any(file_contents) or all(file_contents):
raise ToolExecutionError(
"Must provide exactly one of file_content_str or file_content_base64."
)
if not filename:
raise ToolExecutionError("Must provide a filename.")
client = JiraClient(context.get_auth_token_or_empty())
response = await client.post(
f"/issue/{issue}/attachments",
headers={
"X-Atlassian-Token": "no-check",
},
files=build_file_data(
filename=filename,
file_content_str=file_content_str,
file_content_base64=file_content_base64,
file_type=file_type,
file_encoding=file_encoding,
),
)
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
return {
"status": {
"success": True,
"message": f"Attachment '{filename}' successfully added to the issue '{issue}'",
},
"attachment": clean_attachment_dict(response[0], cloud_name),
}
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def list_issue_attachments_metadata(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue to retrieve"],
) -> Annotated[dict, "Information about the issue"]:
"""Get the metadata about the files attached to an issue.
This tool does NOT return the actual file contents. To get a file content,
use the `Jira.DownloadAttachment` tool.
"""
from arcade_jira.tools.issues import get_issue_by_id # Avoid circular imports
response = await get_issue_by_id(context, issue)
if response.get("error"):
return cast(dict, response)
return {
"issue": {
"id": response["issue"]["id"],
"key": response["issue"]["key"],
"attachments": response["issue"]["attachments"],
}
}
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_attachment_metadata(
context: ToolContext,
attachment_id: Annotated[str, "The ID of the attachment to retrieve"],
) -> Annotated[dict[str, Any], "The metadata of the attachment"]:
"""Get the metadata of an attachment."""
client = JiraClient(context.get_auth_token_or_empty())
try:
response = await client.get(f"/attachment/{attachment_id}")
except NotFoundError:
return {"error": f"Attachment not found with ID '{attachment_id}'."}
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
return {"attachment": clean_attachment_dict(response, cloud_name)}
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def download_attachment(
context: ToolContext,
attachment_id: Annotated[str, "The ID of the attachment to download"],
) -> Annotated[dict[str, Any], "The content of the attachment"]:
"""Download the contents of an attachment associated with an issue."""
client = JiraClient(context.get_auth_token_or_empty())
attachment = await get_attachment_metadata(context, attachment_id)
if attachment.get("error"):
return cast(dict, attachment)
try:
content = await client.get(
f"/attachment/content/{attachment_id}",
params={
"redirect": False,
},
)
except NotFoundError:
return {"error": f"Attachment not found with ID '{attachment_id}'."}
attachment["attachment"]["content"] = content["text"]
return cast(dict, attachment)

View file

@ -0,0 +1,164 @@
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
from arcade.sdk.errors import ToolExecutionError
from arcade_jira.client import JiraClient
from arcade_jira.constants import IssueCommentOrderBy
from arcade_jira.exceptions import MultipleItemsFoundError, NotFoundError
from arcade_jira.utils import (
add_pagination_to_response,
build_adf_doc,
clean_comment_dict,
find_multiple_unique_users,
remove_none_values,
)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_comment_by_id(
context: ToolContext,
issue_id: Annotated[str, "The ID or key of the issue to retrieve the comment from."],
comment_id: Annotated[str, "The ID of the comment to retrieve"],
include_adf_content: Annotated[
bool,
"Whether to include the ADF (Atlassian Document Format) content of the comment in the "
"response. Defaults to False (return only the HTML rendered content).",
] = False,
) -> Annotated[dict[str, Any], "Information about the comment"]:
"""Get a comment by its ID."""
client = JiraClient(context.get_auth_token_or_empty())
response = await client.get(
f"issue/{issue_id}/comment/{comment_id}",
params={"expand": "renderedBody"},
)
if not response:
return {
"comment": None,
"message": f"No comment found with ID '{comment_id}' in the issue '{issue_id}'.",
"query": {"issue_id": issue_id, "comment_id": comment_id},
}
return {"comment": clean_comment_dict(response, include_adf_content)}
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_issue_comments(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue to retrieve"],
limit: Annotated[
int,
"The maximum number of comments to retrieve. Min 1, max 100, default 100.",
] = 100,
offset: Annotated[
int,
"The number of comments to skip. Defaults to 0 (start from the first comment).",
] = 0,
order_by: Annotated[
IssueCommentOrderBy | None,
"The order in which to return the comments. "
f"Defaults to '{IssueCommentOrderBy.CREATED_DATE_DESCENDING.value}' (most recent first).",
] = IssueCommentOrderBy.CREATED_DATE_DESCENDING,
include_adf_content: Annotated[
bool,
"Whether to include the ADF (Atlassian Document Format) content of the comment in the "
"response. Defaults to False (return only the HTML rendered content).",
] = False,
) -> Annotated[dict[str, Any], "Information about the issue comments"]:
"""Get the comments of a Jira issue by its ID."""
limit = max(min(limit, 100), 1)
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
f"issue/{issue}/comment",
params=remove_none_values({
"expand": "renderedBody",
"maxResults": limit,
"startAt": offset,
"orderBy": order_by.to_api_value() if order_by else None,
}),
)
comments = [
clean_comment_dict(comment, include_adf_content)
for comment in api_response["comments"][:limit]
]
response = {
"issue": issue,
"comments": comments,
"isLast": api_response.get("isLast"),
}
return add_pagination_to_response(response, comments, limit, offset)
@tool(
requires_auth=Atlassian(
scopes=[
"write:jira-work", # Needed to add the comment
"read:jira-work", # Needed to get the issue data
"read:jira-user", # Needed to resolve user ID from name or email (mention_users)
],
),
)
async def add_comment_to_issue(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue to comment on."],
body: Annotated[str, "The body of the comment to add to the issue."],
reply_to_comment: Annotated[
str | None,
"Quote a previous comment as a reply to it. Provide the comment's ID. "
"Must be a comment from the same issue. Defaults to None (no quoted comment).",
] = None,
mention_users: Annotated[
list[str] | None,
"The users to mention in the comment. Provide the user display name, email address, or ID. "
"Ex: 'John Doe' or 'john.doe@example.com'. Defaults to None (no user mentions).",
] = None,
) -> Annotated[dict[str, Any], "Information about the comment created"]:
"""Add a comment to a Jira issue."""
if not body:
raise ToolExecutionError("Comment body cannot be empty.")
client = JiraClient(context.get_auth_token_or_empty())
adf_body = build_adf_doc(body)
if mention_users:
try:
users = await find_multiple_unique_users(context, mention_users, exact_match=True)
except (NotFoundError, MultipleItemsFoundError) as exc:
return {"error": f"Failed to mention user: {exc.message}"}
mentions = [
{
"type": "mention",
"attrs": {"accessLevel": "", "id": user["id"], "text": f"@{user['name']}"},
}
for user in users
]
adf_body["content"][0]["content"] = mentions + adf_body["content"][0]["content"]
if reply_to_comment:
quote_comment = await get_comment_by_id(context, issue, reply_to_comment, True)
if not quote_comment["comment"]:
raise ToolExecutionError(
f"Cannot quote comment. No comment found with ID '{reply_to_comment}'."
)
quote = {
"type": "blockquote",
"content": quote_comment["comment"]["adf_body"]["content"],
}
adf_body["content"] = [quote] + adf_body["content"]
response = await client.post(
f"issue/{issue}/comment",
json_data={
"expand": "renderedBody",
"body": adf_body,
},
)
return {
"success": True,
"message": f"Comment successfully created for the issue '{issue}'.",
"comment": {"id": response["id"], "created_at": response["created"]},
}

View file

@ -0,0 +1,791 @@
from typing import Annotated, Any, cast
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
import arcade_jira.cache as cache
from arcade_jira.client import JiraClient
from arcade_jira.exceptions import JiraToolExecutionError, MultipleItemsFoundError, NotFoundError
from arcade_jira.utils import (
add_pagination_to_response,
build_adf_doc,
build_issue_update_request_body,
build_search_issues_jql,
clean_issue_dict,
clean_issue_type_dict,
clean_labels,
convert_date_string_to_date,
extract_id,
find_unique_project,
get_single_project,
remove_none_values,
resolve_issue_users,
validate_issue_args,
)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def list_issue_types_by_project(
context: ToolContext,
project: Annotated[
str,
"The project to get issue types for. Provide a project name, key, or ID. If a "
"project name is provided, the tool will try to find a unique exact match among the "
"available projects.",
],
limit: Annotated[
int,
"The maximum number of issue types to retrieve. Min of 1, max of 200. Defaults to 200.",
] = 200,
offset: Annotated[
int,
"The number of issue types to skip. Defaults to 0 (start from the first issue type).",
] = 0,
) -> Annotated[
dict[str, Any], "Information about the issue types available for the specified project."
]:
"""Get the list of issue types (e.g. 'Task', 'Epic', etc.) available to a given project."""
limit = max(1, min(limit, 200))
client = JiraClient(context.get_auth_token_or_empty())
try:
project_data = await find_unique_project(context, project)
except JiraToolExecutionError as error:
return {"error": error.message}
project_id = project_data["id"]
api_response = await client.get(
f"/issue/createmeta/{project_id}/issuetypes",
params={
"maxResults": limit,
"startAt": offset,
},
)
issue_types = [clean_issue_type_dict(issue_type) for issue_type in api_response["issueTypes"]]
response = {
"project": {
"id": project_data["id"],
"key": project_data["key"],
"name": project_data["name"],
},
"issue_types": issue_types,
"isLast": api_response.get("isLast"),
}
return add_pagination_to_response(response, issue_types, limit, offset)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_issue_type_by_id(
context: ToolContext,
issue_type_id: Annotated[str, "The ID of the issue type to retrieve"],
) -> Annotated[dict, "Information about the issue type"]:
"""Get the details of a Jira issue type by its ID."""
client = JiraClient(context.get_auth_token_or_empty())
try:
response = await client.get(f"issuetype/{issue_type_id}")
except NotFoundError:
return {"error": f"Issue type not found with ID '{issue_type_id}'."}
return {"issue_type": clean_issue_type_dict(response)}
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_issue_by_id(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue to retrieve"],
) -> Annotated[dict[str, Any], "Information about the issue"]:
"""Get the details of a Jira issue by its ID."""
client = JiraClient(context.get_auth_token_or_empty())
try:
response = await client.get(
f"issue/{issue}",
params={"expand": "renderedFields"},
)
except NotFoundError:
return {"error": f"Issue not found with ID/key '{issue}'."}
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
return {"issue": clean_issue_dict(response, cloud_name)}
# NOTE: This is not named `search_issues` because sometimes LLM's won't realize they can
# search for an issue if they don't have the ID (hence the `without_id` in the name). There's
# an alias for this tool named `search_issues_without_jql`, and also another tool to search using
# JQL, named `search_issues_with_jql`.
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to search for issues
"read:jira-user", # Needed to resolve user ID from name or email (assignee, reporter)
"manage:jira-configuration", # Needed to resolve priority ID from name
]
)
)
async def get_issues_without_id(
context: ToolContext,
keywords: Annotated[
str | None,
"Keywords to search for issues. Matches against the issue "
"name, description, comments, and any custom field of type text. "
"Defaults to None (no keywords filtering).",
] = None,
due_from: Annotated[
str | None,
"Match issues due on or after this date. Format: YYYY-MM-DD. Ex: '2025-01-01'. "
"Defaults to None (no due date filtering).",
] = None,
due_until: Annotated[
str | None,
"Match issues due on or before this date. Format: YYYY-MM-DD. Ex: '2025-01-01'. "
"Defaults to None (no due date filtering).",
] = None,
status: Annotated[
str | None,
"Match issues that are in this status. Provide a status name. "
"Ex: 'To Do', 'In Progress', 'Done'. Defaults to None (any status).",
] = None,
priority: Annotated[
str | None,
"Match issues that have this priority. Provide a priority name. E.g. 'Highest'. "
"Defaults to None (any priority).",
] = None,
assignee: Annotated[
str | None,
"Match issues that are assigned to this user. Provide the user's name or email address. "
"Ex: 'John Doe' or 'john.doe@example.com'. Defaults to None (any assignee).",
] = None,
project: Annotated[
str | None,
"Match issues that are associated with this project. Provide the project's name, ID, or "
"key. If a project name is provided, the tool will try to find a unique exact match among "
"the available projects. Defaults to None (search across all projects).",
] = None,
issue_type: Annotated[
str | None,
"Match issues that are of this issue type. Provide an issue type name or ID. "
"E.g. 'Task', 'Epic', '12345'. If a name is provided, the tool will try to find a unique "
"exact match among the available issue types. Defaults to None (any issue type).",
] = None,
labels: Annotated[
list[str] | None,
"Match issues that are in these labels. Defaults to None (any label).",
] = None,
parent_issue: Annotated[
str | None,
"Match issues that are a child of this issue. Provide the issue's ID or key. "
"Defaults to None (no parent issue filtering).",
] = None,
limit: Annotated[
int,
"The maximum number of issues to retrieve. Min 1, max 100, default 50.",
] = 50,
next_page_token: Annotated[
str | None,
"The token to use to get the next page of issues. Defaults to None (first page).",
] = None,
) -> Annotated[dict[str, Any], "Information about the issues matching the search criteria"]:
"""Search for Jira issues when you don't have the issue ID(s).
All text-based arguments (keywords, assignee, project, labels) are case-insensitive.
ALWAYS PREFER THIS TOOL OVER THE `Jira.SearchIssuesWithJql` TOOL, UNLESS IT'S ABSOLUTELY
NECESSARY TO USE A JQL QUERY TO FILTER IN A WAY THAT IS NOT SUPPORTED BY THIS TOOL.
"""
limit = max(1, min(limit, 100))
client = JiraClient(context.get_auth_token_or_empty())
due_from_date = convert_date_string_to_date(due_from) if due_from else None
due_until_date = convert_date_string_to_date(due_until) if due_until else None
jql = build_search_issues_jql(
keywords=keywords,
due_from=due_from_date,
due_until=due_until_date,
status=status,
priority=priority,
assignee=assignee,
project=project,
issue_type=issue_type,
labels=labels,
parent_issue=parent_issue,
)
if not jql:
raise JiraToolExecutionError(
"No search criteria provided. Please provide at least one argument."
)
body = {
"jql": jql,
"maxResults": limit,
"nextPageToken": next_page_token,
"fields": ["*all"],
"expand": "renderedFields",
}
response = await client.post("search/jql", json_data=body)
pagination = {
"limit": limit,
"total_results": len(response["issues"]),
}
if response.get("nextPageToken"):
pagination["next_page_token"] = response["nextPageToken"]
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
return {
"issues": [clean_issue_dict(issue, cloud_name) for issue in response["issues"]],
"pagination": pagination,
}
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to list issues
"read:jira-user", # Required by the `get_issues_without_id` tool
"manage:jira-configuration", # Required by the `get_issues_without_id` tool
],
),
)
async def list_issues(
context: ToolContext,
project: Annotated[
str | None,
"The project to get issues for. Provide a project ID, key or name. If a project "
"is not provided and 1) the user has only one project, the tool will use that; 2) the "
"user has multiple projects, the tool will raise an error listing the available "
"projects to choose from.",
] = None,
limit: Annotated[
int,
"The maximum number of issues to retrieve. Min 1, max 100, default 50.",
] = 50,
next_page_token: Annotated[
str | None,
"The token to use to get the next page of issues. Defaults to None (first page).",
] = None,
) -> Annotated[dict[str, Any], "Information about the issues matching the search criteria"]:
"""Get the issues for a given project."""
if not project:
project_data = await get_single_project(context)
project = project_data["id"]
return cast(
dict[str, Any],
await get_issues_without_id(
context=context,
project=project,
limit=limit,
next_page_token=next_page_token,
),
)
# NOTE: This is an alias for `Jira.GetIssuesWithoutId`. Sometimes LLM's won't realize they can
# search for an issue if they don't have the ID. Other times, they don't realize they can search
# without using JQL. The two names are important to cover those cases.
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to search for issues
"read:jira-user", # Needed to resolve user ID from name or email (assignee, reporter)
"manage:jira-configuration", # Needed to resolve priority ID from name
],
),
)
async def search_issues_without_jql(
context: ToolContext,
keywords: Annotated[
str | None,
"Keywords to search for issues. Matches against the issue "
"name, description, comments, and any custom field of type text. "
"Defaults to None (no keywords filtering).",
] = None,
due_from: Annotated[
str | None,
"Match issues due on or after this date. Format: YYYY-MM-DD. Ex: '2025-01-01'. "
"Defaults to None (no due date filtering).",
] = None,
due_until: Annotated[
str | None,
"Match issues due on or before this date. Format: YYYY-MM-DD. Ex: '2025-01-01'. "
"Defaults to None (no due date filtering).",
] = None,
status: Annotated[
str | None,
"Match issues that are in this status. Provide a status name. "
"Ex: 'To Do', 'In Progress', 'Done'. Defaults to None (any status).",
] = None,
priority: Annotated[
str | None,
"Match issues that have this priority. Provide a priority name. E.g. 'Highest'. "
"Defaults to None (any priority).",
] = None,
assignee: Annotated[
str | None,
"Match issues that are assigned to this user. Provide the user's name or email address. "
"Ex: 'John Doe' or 'john.doe@example.com'. Defaults to None (any assignee).",
] = None,
project: Annotated[
str | None,
"Match issues that are associated with this project. Provide the project's name, ID, or "
"key. If a project name is provided, the tool will try to find a unique exact match among "
"the available projects. Defaults to None (search across all projects).",
] = None,
issue_type: Annotated[
str | None,
"Match issues that are of this issue type. Provide an issue type name or ID. "
"E.g. 'Task', 'Epic', '12345'. If a name is provided, the tool will try to find a unique "
"exact match among the available issue types. Defaults to None (any issue type).",
] = None,
labels: Annotated[
list[str] | None,
"Match issues that are in these labels. Defaults to None (any label).",
] = None,
parent_issue: Annotated[
str | None,
"Match issues that are a child of this issue. Provide the issue's ID or key. "
"Defaults to None (no parent issue filtering).",
] = None,
limit: Annotated[
int,
"The maximum number of issues to retrieve. Min 1, max 100, default 50.",
] = 50,
next_page_token: Annotated[
str | None,
"The token to use to get the next page of issues. Defaults to None (first page).",
] = None,
) -> Annotated[dict[str, Any], "Information about the issues matching the search criteria"]:
"""Parameterized search for Jira issues (without having to provide a JQL query).
THIS TOOL RELEASES LESS CO2 THAN THE `Jira_SearchIssuesWithJql` TOOL. ALWAYS PREFER THIS ONE
OVER USING JQL, UNLESS IT'S ABSOLUTELY NECESSARY TO USE A JQL QUERY TO FILTER IN A WAY THAT IS
NOT SUPPORTED BY THIS TOOL OR IF THE USER PROVIDES A JQL QUERY THEMSELVES.
"""
return cast(
dict[str, Any],
await get_issues_without_id(
context=context,
keywords=keywords,
due_from=due_from,
due_until=due_until,
status=status,
priority=priority,
assignee=assignee,
project=project,
issue_type=issue_type,
labels=labels,
parent_issue=parent_issue,
limit=limit,
next_page_token=next_page_token,
),
)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def search_issues_with_jql(
context: ToolContext,
jql: Annotated[str, "The JQL (Jira Query Language) query to search for issues"],
limit: Annotated[
int,
"The maximum number of issues to retrieve. Min of 1, max of 100. Defaults to 50.",
] = 50,
next_page_token: Annotated[
str | None,
"The token to use to get the next page of issues. Defaults to None (first page).",
] = None,
) -> Annotated[dict[str, Any], "Information about the issues matching the search criteria"]:
"""Search for Jira issues using a JQL (Jira Query Language) query.
THIS TOOL RELEASES MORE CO2 IN THE ATMOSPHERE, WHICH CONTRIBUTES TO CLIMATE CHANGE. ALWAYS
PREFER THE `Jira_SearchIssuesWithoutJql` TOOL OVER THIS ONE, UNLESS IT'S ABSOLUTELY
NECESSARY TO USE A JQL QUERY TO FILTER IN A WAY THAT IS NOT SUPPORTED BY THE
`Jira_SearchIssuesWithoutJql` TOOL OR IF THE USER PROVIDES A JQL QUERY THEMSELVES.
"""
limit = max(1, min(limit, 100))
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.post(
"search/jql",
json_data={
"jql": jql,
"maxResults": limit,
"nextPageToken": next_page_token,
"fields": ["*all"],
"expand": "renderedFields",
},
)
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
response: dict[str, Any] = {
"issues": [clean_issue_dict(issue, cloud_name) for issue in api_response["issues"]]
}
if api_response.get("isLast") is not False and api_response.get("nextPageToken"):
response["pagination"] = {
"limit": limit,
"total_results": len(response["issues"]),
"next_page_token": api_response.get("nextPageToken"),
}
else:
response["pagination"] = {"is_last_page": True}
return response
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to get the current issue data
"write:jira-work", # Needed to create the issue
"read:jira-user", # Needed to resolve user ID from name or email (assignee, reporter)
"manage:jira-configuration", # Needed to resolve priority ID from name
],
),
)
async def create_issue(
context: ToolContext,
title: Annotated[
str,
"The title of the issue.",
],
issue_type: Annotated[
str,
"The name or ID of the issue type. If a name is provided, the tool will try to find a "
"unique exact match among the available issue types.",
],
project: Annotated[
str | None,
"The ID, key or name of the project to associate the issue with. If a name is provided, "
"the tool will try to find a unique exact match among the available projects. "
"Defaults to None (no project). If `project` and `parent_issue` are not provided, "
"the tool will select the single project available. If the user has multiple, an "
"error will be returned with the available projects to choose from.",
] = None,
due_date: Annotated[
str | None,
"The due date of the issue. Format: YYYY-MM-DD. Ex: '2025-01-01'. "
"Defaults to None (no due date).",
] = None,
description: Annotated[
str | None,
"The description of the issue. Defaults to None (no description).",
] = None,
environment: Annotated[
str | None,
"The environment of the issue. Defaults to None (no environment).",
] = None,
labels: Annotated[
list[str] | None,
"The labels of the issue. Defaults to None (no labels). A label cannot contain spaces. "
"If a label is provided with spaces, they will be trimmed and replaced by underscores.",
] = None,
parent_issue: Annotated[
str | None,
"The ID or key of the parent issue. Defaults to None (no parent issue). "
"Must provide at least one of `parent_issue` or `project` arguments.",
] = None,
priority: Annotated[
str | None,
"The ID or name of the priority to use for the issue. If a name is provided, the tool "
"will try to find a unique exact match among the available priorities. Defaults to None "
"(the issue is created with Jira's default priority for the specified project).",
] = None,
assignee: Annotated[
str | None,
"The name, email or ID of the user to assign the issue to. If a name or email is provided, "
"the tool will try to find a unique exact match among the available users. "
"Defaults to None (no assignee).",
] = None,
reporter: Annotated[
str | None,
"The name, email or ID of the user who is the reporter of the issue. If a name or email is "
"provided, the tool will try to find a unique exact match among the available users. "
"Defaults to None (no reporter).",
] = None,
) -> Annotated[dict, "The created issue"]:
"""Create a new Jira issue.
Provide a value to one of `project` or `parent_issue` arguments. If `project` and
`parent_issue` are not provided, the tool will select the single project available.
If the user has multiple, an error will be returned with the available projects to choose from.
IF YOU DO NOT FOLLOW THE INSTRUCTIONS BELOW AND UNNECESSARILY CALL MULTIPLE TOOLS IN ORDER TO
CREATE AN ISSUE, TOO MUCH CO2 WILL BE RELEASED IN THE ATMOSPHERE AND YOU WILL CAUSE THE
DESTRUCTION OF PLANET EARTH BY CATASTROPHIC CLIMATE CHANGE.
If you have an issue type name, or a project key/name, a priority name, an assignee
name/key/email, or a reporter name/key/email, DO NOT CALL OTHER TOOLS only to list available
projects, priorities, issue types, or users. Provide the name, key, or email and the tool
will figure out the ID, WITHOUT CAUSING CATASTROPHIC CLIMATE CHANGE.
"""
project_data: dict[str, Any] | None = None
if project is None and parent_issue is None:
try:
project_data = await get_single_project(context)
except (NotFoundError, MultipleItemsFoundError) as exc:
return {"error": str(exc)}
else:
project = project_data["id"]
(
error,
project_data,
issue_type_data,
priority_data,
parent_data,
) = await validate_issue_args(context, due_date, project, issue_type, priority, parent_issue)
if error:
return error
error, assignee_data, reporter_data = await resolve_issue_users(context, assignee, reporter)
if error:
return error
client = JiraClient(context.get_auth_token_or_empty())
request_body = {
"fields": remove_none_values({
"summary": title,
"labels": clean_labels(labels),
"duedate": due_date,
"parent": extract_id(parent_data),
"project": extract_id(project_data),
"priority": extract_id(priority_data),
"assignee": extract_id(assignee_data),
"reporter": extract_id(reporter_data),
"issuetype": extract_id(issue_type_data),
}),
}
if environment:
request_body["fields"]["environment"] = build_adf_doc(environment)
if description:
request_body["fields"]["description"] = build_adf_doc(description)
response = await client.post("issue", json_data=request_body)
return {
"status": {
"success": True,
"message": "Issue successfully created.",
},
"issue": {
"id": response["id"],
"key": response["key"],
"url": response["self"],
},
}
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to get the current issue labels
"write:jira-work", # Needed to update the issue
"read:jira-user", # Required by the `update_issue` tool
"manage:jira-configuration", # Required by the `update_issue` tool
],
),
)
async def add_labels_to_issue(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue to update"],
labels: Annotated[
list[str],
"The labels to add to the issue. A label cannot contain spaces. "
"If a label is provided with spaces, they will be trimmed and replaced by underscores.",
],
notify_watchers: Annotated[
bool,
"Whether to notify the issue's watchers. Defaults to True (notifies watchers).",
] = True,
) -> Annotated[dict, "The updated issue"]:
"""Add labels to an existing Jira issue."""
issue_data = await get_issue_by_id(context, issue)
if issue_data.get("error"):
return cast(dict, issue_data)
labels = cast(list[str], clean_labels(labels))
current_labels = issue_data["issue"]["labels"]
response = await update_issue(
context=context,
issue=issue_data["issue"]["id"],
labels=current_labels + labels,
notify_watchers=notify_watchers,
)
return cast(dict, response)
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to get the current issue labels
"write:jira-work", # Needed to update the issue
"read:jira-user", # Required by the `update_issue` tool
"manage:jira-configuration", # Required by the `update_issue` tool
],
),
)
async def remove_labels_from_issue(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue to update"],
labels: Annotated[list[str], "The labels to remove from the issue (case-insensitive)"],
notify_watchers: Annotated[
bool,
"Whether to notify the issue's watchers. Defaults to True (notifies watchers).",
] = True,
) -> Annotated[dict[str, Any], "The updated issue"]:
"""Remove labels from an existing Jira issue."""
issue_data = await get_issue_by_id(context, issue)
if issue_data.get("error"):
return cast(dict, issue_data)
lowercase_labels = [label.casefold() for label in labels]
current_labels = issue_data["issue"]["labels"]
new_labels = [label for label in current_labels if label.casefold() not in lowercase_labels]
response = await update_issue(
context=context,
issue=issue_data["issue"]["id"],
labels=new_labels,
notify_watchers=notify_watchers,
)
return cast(dict, response)
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to get the current issue data
"write:jira-work", # Needed to update the issue
"read:jira-user", # Needed to resolve user ID from name or email (assignee, reporter)
"manage:jira-configuration", # Needed to resolve priority ID from name
],
),
)
async def update_issue(
context: ToolContext,
issue: Annotated[str, "The key or ID of the issue to update"],
title: Annotated[
str | None,
"The new issue title. Provide an empty string to clear the title. "
"Defaults to None (does not change the title).",
] = None,
description: Annotated[
str | None,
"The new issue description. Provide an empty string to clear the description. "
"Defaults to None (does not change the description).",
] = None,
environment: Annotated[
str | None,
"The new issue environment. Provide an empty string to clear the environment. "
"Defaults to None (does not change the environment).",
] = None,
due_date: Annotated[
str | None,
"The new issue due date. Format: YYYY-MM-DD. Ex: '2025-01-01'. Provide an empty string "
"to clear the due date. Defaults to None (does not change the due date).",
] = None,
issue_type: Annotated[
str | None,
"The new issue type name or ID. If a name is provided, the tool will try to find a unique "
"exact match among the available issue types. Defaults to None (does not change the "
"issue type).",
] = None,
priority: Annotated[
str | None,
"The name or ID of the new issue priority. If a name is provided, the tool will try to "
"find a unique exact match among the available priorities. Defaults to None "
"(does not change the priority).",
] = None,
parent_issue: Annotated[
str | None,
"The ID or key of the parent issue. A parent cannot be removed by providing an empty "
"string. It is possible to change the parent issue by providing a new issue ID or key, "
"or to leave it unchanged. Defaults to None (does not change the parent issue).",
] = None,
assignee: Annotated[
str | None,
"The new issue assignee name, email, or ID. If a name or email is provided, the tool will "
"try to find a unique exact match among the available users. Provide an empty string to "
"remove the assignee. Defaults to None (does not change the assignee).",
] = None,
reporter: Annotated[
str | None,
"The new issue reporter name, email, or ID. If a name or email is provided, the tool will "
"try to find a unique exact match among the available users. Provide an empty string to "
"remove the reporter. Defaults to None (does not change the reporter).",
] = None,
labels: Annotated[
list[str] | None,
"The new issue labels. This argument will replace all labels with the new list. "
"Providing an empty list will remove all labels. To add or remove a subset of "
f"labels, use the `Jira.{add_labels_to_issue.__tool_name__}` or the "
f"`Jira.{remove_labels_from_issue.__tool_name__}` tools. "
"Defaults to None (does not change the labels). A label cannot contain spaces. "
"If a label is provided with spaces, they will be trimmed and replaced by underscores.",
] = None,
notify_watchers: Annotated[
bool,
"Whether to notify the issue's watchers. Defaults to True (notifies watchers).",
] = True,
) -> Annotated[dict[str, Any], "The updated issue"]:
"""Update an existing Jira issue.
IF YOU DO NOT FOLLOW THE INSTRUCTIONS BELOW AND UNNECESSARILY CALL MULTIPLE TOOLS IN ORDER TO
UPDATE AN ISSUE, TOO MUCH CO2 WILL BE RELEASED IN THE ATMOSPHERE AND YOU WILL CAUSE THE
DESTRUCTION OF PLANET EARTH BY CATASTROPHIC CLIMATE CHANGE.
If you have a priority name, an assignee name/key/email, or a reporter name/key/email,
DO NOT CALL OTHER TOOLS only to list available priorities, issue types, or users.
Provide the name, key, or email and the tool will figure out the ID.
"""
issue_data = await get_issue_by_id(context, issue)
if issue_data.get("error"):
return cast(dict, issue_data)
project = issue_data["issue"]["project"]["id"]
error, _, issue_type_data, priority_data, parent_issue_data = await validate_issue_args(
context, due_date, project, issue_type, priority, parent_issue
)
if error:
return cast(dict, error)
error, assignee_data, reporter_data = await resolve_issue_users(context, assignee, reporter)
if error:
return cast(dict, error)
client = JiraClient(context.get_auth_token_or_empty())
params = {"notifyWatchers": notify_watchers, "expand": "renderedFields"}
request_body = build_issue_update_request_body(
title=title,
description=description,
environment=environment,
due_date=due_date,
parent_issue=parent_issue_data,
issue_type=issue_type_data,
priority=priority_data,
assignee=assignee_data,
reporter=reporter_data,
labels=clean_labels(labels),
)
if not request_body["fields"] and not request_body["update"]:
raise JiraToolExecutionError(
"No changes provided. Please provide at least one argument to update the issue."
)
await client.put(f"/issue/{issue}", json_data=request_body, params=params)
return {
"issue": {
"id": issue_data["issue"]["id"],
"key": issue_data["issue"]["key"],
},
"status": "success",
"message": "Issue updated successfully.",
}

View file

@ -0,0 +1,34 @@
from typing import Annotated, Any
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
from arcade_jira.client import JiraClient
from arcade_jira.utils import add_pagination_to_response
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def list_labels(
context: ToolContext,
limit: Annotated[
int, "The maximum number of labels to return. Min of 1, Max of 200. Defaults to 200."
] = 200,
offset: Annotated[
int, "The number of labels to skip. Defaults to 0 (starts from the first label)"
] = 0,
) -> Annotated[dict[str, Any], "The existing labels (tags) in the user's Jira instance"]:
"""Get the existing labels (tags) in the user's Jira instance."""
limit = max(min(limit, 200), 1)
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
"/label",
params={
"maxResults": limit,
"startAt": offset,
},
)
response = {
"labels": api_response["values"],
"total": api_response["total"],
}
return add_pagination_to_response(response, api_response["values"], limit, offset)

View file

@ -0,0 +1,201 @@
import asyncio
from typing import Annotated, Any, cast
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
import arcade_jira.cache as cache
from arcade_jira.client import JiraClient
from arcade_jira.constants import JIRA_API_REQUEST_TIMEOUT, PrioritySchemeOrderBy
from arcade_jira.exceptions import JiraToolExecutionError, MultipleItemsFoundError, NotFoundError
from arcade_jira.utils import (
add_pagination_to_response,
clean_priority_dict,
clean_priority_scheme_dict,
clean_project_dict,
find_priorities_by_project,
find_unique_project,
remove_none_values,
)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_priority_by_id(
context: ToolContext,
priority_id: Annotated[str, "The ID of the priority to retrieve."],
) -> Annotated[dict[str, Any], "The priority"]:
"""Get the details of a priority by its ID."""
client = JiraClient(context.get_auth_token_or_empty())
try:
response = await client.get(f"/priority/{priority_id}")
except NotFoundError:
return {"error": f"Priority not found with id '{priority_id}'"}
return {"priority": clean_priority_dict(response)}
@tool(requires_auth=Atlassian(scopes=["manage:jira-configuration"]))
async def list_priority_schemes(
context: ToolContext,
scheme_name: Annotated[
str | None, "Filter by scheme name. Defaults to None (returns all scheme names)."
] = None,
limit: Annotated[
int,
"The maximum number of priority schemes to return. Min of 1, max of 50. Defaults to 50.",
] = 50,
offset: Annotated[
int, "The number of priority schemes to skip. Defaults to 0 (start from the first scheme)."
] = 0,
order_by: Annotated[
PrioritySchemeOrderBy,
"The order in which to return the priority schemes. Defaults to name ascending.",
] = PrioritySchemeOrderBy.NAME_ASCENDING,
) -> Annotated[dict[str, Any], "The priority schemes available"]:
"""Browse the priority schemes available in Jira."""
limit = max(min(limit, 50), 1)
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
"/priorityscheme",
params=remove_none_values({
"startAt": offset,
"maxResults": limit,
"schemeName": scheme_name,
"orderBy": order_by.to_api_value(),
}),
)
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
schemes = [clean_priority_scheme_dict(scheme, cloud_name) for scheme in api_response["values"]]
response = {
"priority_schemes": schemes,
"isLast": api_response.get("isLast"),
}
return add_pagination_to_response(response, schemes, limit, offset)
@tool(requires_auth=Atlassian(scopes=["manage:jira-configuration"]))
async def list_priorities_associated_with_a_priority_scheme(
context: ToolContext,
scheme_id: Annotated[str, "The ID of the priority scheme to retrieve priorities for."],
limit: Annotated[
int,
"The maximum number of priority schemes to return. Min of 1, max of 50. Defaults to 50.",
] = 50,
offset: Annotated[
int, "The number of priority schemes to skip. Defaults to 0 (start from the first scheme)."
] = 0,
) -> Annotated[dict[str, Any], "The priorities associated with the priority scheme"]:
"""Browse the priorities associated with a priority scheme."""
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
f"/priorityscheme/{scheme_id}/priorities",
params={
"startAt": offset,
"maxResults": limit,
},
)
priorities = [clean_priority_dict(priority) for priority in api_response["values"]]
response = {
"priorities": priorities,
"isLast": api_response.get("isLast"),
}
return add_pagination_to_response(response, priorities, limit, offset)
@tool(requires_auth=Atlassian(scopes=["manage:jira-configuration"]))
async def list_projects_associated_with_a_priority_scheme(
context: ToolContext,
scheme_id: Annotated[str, "The ID of the priority scheme to retrieve projects for."],
project: Annotated[
str | None, "Filter by project ID, key or name. Defaults to None (returns all projects)."
] = None,
limit: Annotated[
int,
"The maximum number of projects to return. Min of 1, max of 50. Defaults to 50.",
] = 50,
offset: Annotated[
int, "The number of projects to skip. Defaults to 0 (start from the first project)."
] = 0,
) -> Annotated[dict[str, Any], "The projects associated with the priority scheme"]:
"""Browse the projects associated with a priority scheme."""
if project:
try:
project_data = await find_unique_project(context, project)
except (NotFoundError, MultipleItemsFoundError) as exc:
return {"error": exc.message}
else:
project = project_data["id"]
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
f"/priorityscheme/{scheme_id}/projects",
params=remove_none_values({
"startAt": offset,
"maxResults": limit,
"projectId": project,
}),
)
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
projects = [clean_project_dict(project, cloud_name) for project in api_response["values"]]
response = {
"projects": projects,
"isLast": api_response.get("isLast"),
}
return add_pagination_to_response(response, projects, limit, offset)
@tool(requires_auth=Atlassian(scopes=["manage:jira-configuration"]))
async def list_priorities_available_to_a_project(
context: ToolContext,
project: Annotated[str, "The ID, key or name of the project to retrieve priorities for."],
) -> Annotated[
dict[str, Any],
"The priorities available to be used in issues in the specified Jira project",
]:
"""Browse the priorities available to be used in issues in the specified Jira project.
This tool may need to loop through several API calls to get all priorities associated with
a specific project. In Jira environments with too many Projects or Priority Schemes,
the search may take too long, and the tool call will timeout.
"""
try:
project_data = await find_unique_project(context, project)
except (NotFoundError, MultipleItemsFoundError) as exc:
return {"error": exc.message}
try:
return await asyncio.wait_for(
find_priorities_by_project(context, project_data),
timeout=JIRA_API_REQUEST_TIMEOUT,
)
except asyncio.TimeoutError:
return {"error": f"The operation timed out after {JIRA_API_REQUEST_TIMEOUT} seconds."}
except JiraToolExecutionError as error:
return {"error": error.message}
@tool(requires_auth=Atlassian(scopes=["manage:jira-configuration"]))
async def list_priorities_available_to_an_issue(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue to retrieve priorities for."],
) -> Annotated[dict[str, Any], "The priorities available to be used in the specified Jira issue"]:
"""Browse the priorities available to be used in the specified Jira issue."""
from arcade_jira.tools.issues import get_issue_by_id
issue_response = await get_issue_by_id(context, issue)
if issue_response.get("error"):
return cast(dict[str, Any], issue_response)
issue_data = issue_response["issue"]
project = issue_data["project"]["id"]
response = await list_priorities_available_to_a_project(context, project)
return {
"issue": {
"id": issue_data["id"],
"key": issue_data["key"],
"title": issue_data["title"],
},
"project": response["project"],
"priorities_available": response["priorities_available"],
}

View file

@ -0,0 +1,85 @@
from typing import Annotated, Any, cast
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
import arcade_jira.cache as cache
from arcade_jira.client import JiraClient
from arcade_jira.exceptions import NotFoundError
from arcade_jira.utils import (
add_pagination_to_response,
clean_project_dict,
remove_none_values,
)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def list_projects(
context: ToolContext,
limit: Annotated[
int, "The maximum number of projects to return. Min of 1, Max of 50. Defaults to 50."
] = 50,
offset: Annotated[
int, "The number of projects to skip. Defaults to 0 (starts from the first project)"
] = 0,
) -> Annotated[dict[str, Any], "Information about the projects"]:
"""Browse projects available in Jira."""
return cast(
dict[str, Any], await search_projects(context, keywords=None, limit=limit, offset=offset)
)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def search_projects(
context: ToolContext,
keywords: Annotated[
str | None,
"The keywords to search for projects. Matches against project name and key "
"(case insensitive). Defaults to None (no keywords filter).",
] = None,
limit: Annotated[
int, "The maximum number of projects to return. Min of 1, Max of 50. Defaults to 50."
] = 50,
offset: Annotated[
int, "The number of projects to skip. Defaults to 0 (starts from the first project)"
] = 0,
) -> Annotated[dict[str, Any], "Information about the projects"]:
"""Get the details of all Jira projects."""
limit = max(min(limit, 50), 1)
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
"/project/search",
params=remove_none_values({
"expand": ",".join([
"description",
"url",
]),
"maxResults": limit,
"startAt": offset,
"query": keywords,
}),
)
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
projects = [clean_project_dict(project, cloud_name) for project in api_response["values"]]
response = {
"projects": projects,
"isLast": api_response.get("isLast"),
}
return add_pagination_to_response(response, projects, limit, offset)
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_project_by_id(
context: ToolContext,
project: Annotated[str, "The ID or key of the project to retrieve"],
) -> Annotated[dict[str, Any], "Information about the project"]:
"""Get the details of a Jira project by its ID or key."""
client = JiraClient(context.get_auth_token_or_empty())
try:
response = await client.get(f"project/{project}")
except NotFoundError:
return {"error": f"Project not found: {project}"}
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
return {"project": clean_project_dict(response, cloud_name)}

View file

@ -0,0 +1,145 @@
from typing import Annotated, cast
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
from arcade_jira.client import JiraClient
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_transition_by_id(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue"],
transition_id: Annotated[str, "The ID of the transition"],
) -> Annotated[dict, "The transition data"]:
"""Get a transition by its ID."""
if not transition_id:
return {"error": "The transition ID is required."}
if not transition_id.isdigit():
return {"error": "The transition ID must be a numeric string."}
client = JiraClient(context.get_auth_token_or_empty())
response = await client.get(
f"/issue/{issue}/transitions",
params={
"transitionId": transition_id,
},
)
transitions = response["transitions"]
if len(transitions) == 0:
return {
"error": (
f"No transition found for the issue '{issue}' with ID '{transition_id}'. "
"To get all transitions available for the issue, use the "
f"`Jira.{get_transitions_available_for_issue.__tool_name__}` tool."
),
}
if len(transitions) == 1:
return {"transition": transitions[0]}
return {
"error": f"Multiple transitions found for the issue '{issue}' with ID '{transition_id}'.",
"transitions": transitions,
}
@tool(requires_auth=Atlassian(scopes=["read:jira-work"]))
async def get_transitions_available_for_issue(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue"],
) -> Annotated[dict, "The transitions available and the issue's current status"]:
"""Get the transitions available for an existing Jira issue."""
from arcade_jira.tools.issues import get_issue_by_id # Avoid circular import
client = JiraClient(context.get_auth_token_or_empty())
issue_data = await get_issue_by_id(context, issue)
if issue_data.get("error"):
return cast(dict, issue_data)
response = await client.get(
f"/issue/{issue_data['issue']['id']}/transitions",
params={
"expand": "transitions.fields",
},
)
return {
"issue": {
"id": issue_data["issue"]["id"],
"key": issue_data["issue"]["key"],
"current_status": issue_data["issue"]["status"],
},
"transitions_available": response["transitions"],
}
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to get the transitions available for the issue
"write:jira-work", # Needed to transition the issue
],
),
)
async def get_transition_by_status_name(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue"],
transition: Annotated[str, "The name of the transition status"],
) -> Annotated[dict, "The transition data, including screen fields available"]:
"""Get a transition available for an issue by the transition name.
The response will contain screen fields available for the transition, if any.
"""
transitions = await get_transitions_available_for_issue(context, issue)
for available_transition in transitions["transitions_available"]:
if available_transition["name"].casefold() == transition.casefold():
return {"issue": issue, "transition": available_transition}
return {
"error": f"Transition '{transition}' not found for the issue '{issue}'",
"transitions_available": transitions["transitions_available"],
}
@tool(
requires_auth=Atlassian(
scopes=[
"read:jira-work", # Needed to get the transition ID by name
"write:jira-work", # Needed to transition the issue
],
),
)
async def transition_issue_to_new_status(
context: ToolContext,
issue: Annotated[str, "The ID or key of the issue"],
transition: Annotated[
str,
"The transition to perform. Provide the transition ID or its name (case insensitive).",
],
) -> Annotated[dict, "The updated issue"]:
"""Transition a Jira issue to a new status."""
client = JiraClient(context.get_auth_token_or_empty())
# Try to get the transition by ID first
response = await get_transition_by_id(context, issue, transition)
# If the transition is not found by ID, try to get it by name
if response.get("error"):
response = await get_transition_by_status_name(context, issue, transition)
if response.get("error"):
return cast(dict, response)
transition_id = response["transition"]["id"]
transition_name = response["transition"]["name"]
# The /issue/issue_id/transitions endpoint returns a 204 No Content in case of success
await client.post(
f"/issue/{issue}/transitions",
json_data={
"transition": {"id": transition_id},
},
)
return {
"status": "success",
"message": f"Issue '{issue}' successfully transitioned to '{transition_name}'.",
}

View file

@ -0,0 +1,151 @@
from typing import Annotated, Any, cast
from arcade.sdk import ToolContext, tool
from arcade.sdk.auth import Atlassian
from arcade.sdk.errors import ToolExecutionError
import arcade_jira.cache as cache
from arcade_jira.client import JiraClient
from arcade_jira.exceptions import NotFoundError
from arcade_jira.utils import add_pagination_to_response, clean_user_dict, remove_none_values
@tool(requires_auth=Atlassian(scopes=["read:jira-user"]))
async def list_users(
context: ToolContext,
account_type: Annotated[
str | None,
"The account type of the users to return. Defaults to 'atlassian'. Provide `None` to "
"disable filtering by account type. The account type filter will be applied after "
"retrieving users from Jira API, thus the tool may return less users than the limit and "
"still have more users to paginate. Check the `pagination` key in the response dictionary.",
] = "atlassian",
limit: Annotated[
int,
"The maximum number of users to return. Min of 1, max of 50. Defaults to 50.",
] = 50,
offset: Annotated[
int,
"The number of users to skip before starting to return users. "
"Defaults to 0 (start from the first user).",
] = 0,
) -> Annotated[dict[str, Any], "The information about all users."]:
"""Browse users in Jira."""
limit = max(min(limit, 50), 1)
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
"/users/search",
params={
"startAt": offset,
"maxResults": limit,
},
)
items = cast(list[dict[str, Any]], api_response)
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
users = [
clean_user_dict(user, cloud_name)
for user in api_response
if not account_type or user["accountType"].casefold() == account_type.casefold()
]
response = add_pagination_to_response({"users": users}, items, limit, offset)
response["pagination"]["total_results"] = len(users)
return response
@tool(requires_auth=Atlassian(scopes=["read:jira-user"]))
async def get_user_by_id(
context: ToolContext,
user_id: Annotated[str, "The the user's ID."],
) -> Annotated[dict[str, Any], "The user information."]:
"""Get user information by their ID."""
client = JiraClient(context.get_auth_token_or_empty())
not_found = {"error": "User not found"}
try:
response = await client.get("user", params={"accountId": user_id})
except NotFoundError:
return not_found
if not response:
return not_found
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
return {"user": clean_user_dict(response, cloud_name)}
@tool(requires_auth=Atlassian(scopes=["read:jira-user"]))
async def get_users_without_id(
context: ToolContext,
name_or_email: Annotated[
str,
"The user's display name or email address to search for (case-insensitive). The string can "
"match the prefix of the user's attribute. For example, a string of 'john' will match "
"users with a display name or email address that starts with 'john', such as "
"'John Doe', 'Johnson', 'john@example.com', etc.",
],
enforce_exact_match: Annotated[
bool,
"Whether to enforce an exact match of the name_or_email against users' display name and "
"email attributes. Defaults to False (return all users that match the prefix). If set to "
"True, before returning results, the tool will filter users with a display name OR email "
"address that match exactly the value of the `name_or_email` argument.",
] = False,
limit: Annotated[
int,
"The maximum number of users to return. Min of 1, max of 50. Defaults to 50.",
] = 50,
offset: Annotated[
int,
"The number of users to skip before starting to return users. "
"Defaults to 0 (start from the first user).",
] = 0,
) -> Annotated[dict[str, Any], "The information about users that match the search criteria."]:
"""Get users without their account ID, searching by display name and email address.
The Jira user search API will return up to 1,000 (one thousand) users for any given name/email
query. If you need to get more users, please use the `Jira.ListAllUsers` tool.
"""
limit = max(min(limit, 1000), 1)
if limit + offset > 1000:
raise ToolExecutionError(
"The maximum number of users returned by the Jira search API is 1000. "
f"To get more users use the `Jira.{list_users.__tool_name__}` tool."
)
if not name_or_email:
raise ToolExecutionError(
"The `user_name_or_email` argument is required to search for users."
)
client = JiraClient(context.get_auth_token_or_empty())
api_response = await client.get(
"/user/search",
params=remove_none_values({
"query": name_or_email,
"startAt": offset,
"maxResults": limit,
}),
)
cloud_name = cache.get_cloud_name(context.get_auth_token_or_empty())
users = [clean_user_dict(user, cloud_name) for user in api_response]
if enforce_exact_match:
users = [
user
for user in users
if user["name"].casefold() == name_or_email.casefold()
or user["email"].casefold() == name_or_email.casefold()
]
response = {
"users": users,
"query": {
"name_or_email": name_or_email,
"enforce_exact_match": enforce_exact_match,
"limit": limit,
"offset": offset,
},
}
return add_pagination_to_response(response, users, limit, offset, 1000)

File diff suppressed because it is too large Load diff

210
toolkits/jira/conftest.py Normal file
View file

@ -0,0 +1,210 @@
import random
import string
from typing import Any, Callable
from unittest.mock import MagicMock, patch
import httpx
import pytest
from arcade.sdk import ToolAuthorizationContext, ToolContext
from arcade_jira.cache import set_cloud_id, set_cloud_name
@pytest.fixture
def fake_auth_token(generate_random_str: Callable) -> str:
return generate_random_str()
@pytest.fixture
def fake_cloud_id(generate_random_str: Callable) -> str:
return generate_random_str()
@pytest.fixture
def fake_cloud_name(generate_random_str: Callable) -> str:
return generate_random_str()
@pytest.fixture(autouse=True)
def set_cloud_id_cache(fake_auth_token: str, fake_cloud_id: str, fake_cloud_name: str) -> None:
"""This fixture auto-sets cloud ID in the cache to skip the HTTP call to get it"""
set_cloud_id(fake_auth_token, fake_cloud_id)
set_cloud_name(fake_auth_token, fake_cloud_name)
@pytest.fixture
def generate_random_str() -> Callable[[int], str]:
def random_str_builder(length: int = 10) -> str:
return "".join(random.choices(string.ascii_letters + string.digits, k=length)) # noqa: S311
return random_str_builder
@pytest.fixture
def generate_random_email(generate_random_str: Callable) -> Callable[[str | None, str | None], str]:
def random_email_generator(name: str | None = None, domain: str | None = None) -> str:
name = name or generate_random_str()
domain = domain or f"{generate_random_str()}.com"
return f"{name}@{domain}"
return random_email_generator
@pytest.fixture
def generate_random_url(generate_random_str: Callable) -> Callable[[str], str]:
def random_url_generator(base_url: str | None = None) -> str:
base_url = base_url or f"https://{generate_random_str()}.com"
return f"{base_url}/{generate_random_str()}"
return random_url_generator
@pytest.fixture
def mock_context(fake_auth_token: str) -> ToolContext:
mock_auth = ToolAuthorizationContext(token=fake_auth_token)
return ToolContext(authorization=mock_auth)
@pytest.fixture
def mock_httpx_client():
with patch("arcade_jira.client.httpx") as mock_httpx:
yield mock_httpx.AsyncClient().__aenter__.return_value
@pytest.fixture
def mock_httpx_response() -> Callable[[int, dict], httpx.Response]:
def generate_mock_httpx_response(status_code: int, json_data: dict) -> httpx.Response:
response = MagicMock(spec=httpx.Response)
response.status_code = status_code
response.json.return_value = json_data
return response
return generate_mock_httpx_response
@pytest.fixture
def build_user_dict(
generate_random_str: Callable[[int], str],
generate_random_email: Callable[[str | None, str | None], str],
) -> Callable[[str | None, str | None, str | None, bool, str], dict]:
def user_dict_builder(
id_: str | None = None,
email: str | None = None,
display_name: str | None = None,
active: bool = True,
account_type: str = "atlassian",
) -> dict[str, Any]:
display_name = display_name or generate_random_str()
user = {
"accountId": id_ or generate_random_str(),
"displayName": display_name,
"emailAddress": email or generate_random_email(name=display_name),
"active": active,
"accountType": account_type,
}
return user
return user_dict_builder
@pytest.fixture
def build_project_dict(
generate_random_str: Callable,
generate_random_url: Callable,
) -> Callable[[str | None, str | None, str | None, str | None, str | None], dict]:
def project_dict_builder(
id_: str | None = None,
key: str | None = None,
name: str | None = None,
description: str | None = None,
url: str | None = None,
) -> dict[str, Any]:
return {
"id": id_ or generate_random_str(),
"key": key or generate_random_str(),
"name": name or generate_random_str(),
"description": description or generate_random_str(),
"url": url or generate_random_url(),
}
return project_dict_builder
@pytest.fixture
def build_project_search_response_dict() -> Callable[[list[dict], bool], dict]:
def project_search_response_builder(projects: list[dict], is_last: bool = True) -> dict:
return {
"values": projects,
"isLast": is_last,
}
return project_search_response_builder
@pytest.fixture
def build_priority_dict(
generate_random_str: Callable,
) -> Callable[[str | None, str | None, str | None], dict]:
def priority_dict_builder(
id_: str | None = None,
name: str | None = None,
description: str | None = None,
) -> dict:
return {
"id": id_ or generate_random_str(),
"name": name or generate_random_str(),
"description": description or generate_random_str(),
}
return priority_dict_builder
@pytest.fixture
def build_issue_type_dict(
generate_random_str: Callable,
) -> Callable[[str | None, str | None, str | None], dict]:
def issue_type_dict_builder(
id_: str | None = None, name: str | None = None, description: str | None = None
) -> dict:
return {
"id": id_ or generate_random_str(),
"name": name or generate_random_str(),
"description": description or generate_random_str(),
}
return issue_type_dict_builder
@pytest.fixture
def build_issue_types_response_dict() -> Callable[[list[dict]], dict]:
def issue_types_response_builder(
issue_types: list[dict],
is_last: bool = True,
) -> dict:
return {
"issueTypes": issue_types,
"isLast": is_last,
}
return issue_types_response_builder
@pytest.fixture
def build_priority_scheme_dict(
generate_random_str: Callable,
) -> Callable[[str | None, str | None, str | None, bool], dict]:
def priority_scheme_dict_builder(
id_: str | None = None,
name: str | None = None,
description: str | None = None,
is_default: bool = False,
) -> dict:
return {
"id": id_ or generate_random_str(),
"name": name or generate_random_str(),
"description": description or generate_random_str(),
"isDefault": is_default,
}
return priority_scheme_dict_builder

View file

@ -0,0 +1,380 @@
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_jira
from arcade_jira.critics import (
CaseInsensitiveBinaryCritic,
CaseInsensitiveListOfStringsBinaryCritic,
)
from arcade_jira.tools.issues import (
add_labels_to_issue,
create_issue,
remove_labels_from_issue,
update_issue,
)
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_jira)
@tool_eval()
def create_issue_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Create issue eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Create issue",
user_message="Create a 'High' priority task for John Doe with the following properties: "
"title: 'Test issue', "
"description: 'This is a test issue', "
"project: 'ENG-123', "
"issue_type: 'Task', "
"due on '2025-06-30'. "
"Label it with Hello and World.",
expected_tool_calls=[
ExpectedToolCall(
func=create_issue,
args={
"title": "Test issue",
"description": "This is a test issue",
"project": "ENG-123",
"issue_type": "Task",
"priority": "High",
"assignee": "John Doe",
"due_date": "2025-06-30",
"labels": ["Hello", "World"],
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="title", weight=1 / 8),
CaseInsensitiveBinaryCritic(critic_field="description", weight=1 / 8),
CaseInsensitiveBinaryCritic(critic_field="project", weight=1 / 8),
CaseInsensitiveBinaryCritic(critic_field="issue_type", weight=1 / 8),
CaseInsensitiveBinaryCritic(critic_field="priority", weight=1 / 8),
CaseInsensitiveBinaryCritic(critic_field="assignee", weight=1 / 8),
BinaryCritic(critic_field="due_date", weight=1 / 8),
CaseInsensitiveListOfStringsBinaryCritic(critic_field="labels", weight=1 / 8),
],
)
suite.add_case(
name="Create issue with parent and reporter",
user_message=(
"Create a task for John Doe to 'Implement message queue service' as a child of the issue ENG-321 "
"and reported by Jenifer Bear. It should be due on 2025-06-30. Label it with 'Project XYZ'."
),
expected_tool_calls=[
ExpectedToolCall(
func=create_issue,
args={
"title": "Implement message queue service",
"parent_issue": "ENG-321",
"issue_type": "Task",
"assignee": "John Doe",
"reporter": "Jenifer Bear",
"due_date": "2025-06-30",
"labels": ["Project XYZ"],
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="title", weight=1 / 7),
CaseInsensitiveBinaryCritic(critic_field="parent_issue", weight=1 / 7),
CaseInsensitiveBinaryCritic(critic_field="issue_type", weight=1 / 7),
CaseInsensitiveBinaryCritic(critic_field="assignee", weight=1 / 7),
CaseInsensitiveBinaryCritic(critic_field="reporter", weight=1 / 7),
BinaryCritic(critic_field="due_date", weight=1 / 7),
CaseInsensitiveListOfStringsBinaryCritic(critic_field="labels", weight=1 / 7),
],
)
return suite
@tool_eval()
def labels_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Labels eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Add labels",
user_message="Add the labels 'Hello' and 'World' to the issue ENG-123.",
expected_tool_calls=[
ExpectedToolCall(
func=add_labels_to_issue,
args={
"issue": "ENG-123",
"labels": ["Hello", "World"],
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveListOfStringsBinaryCritic(critic_field="labels", weight=0.5),
],
)
suite.add_case(
name="Add labels without notifying watchers",
user_message="Add the labels 'Hello' and 'World' to the issue ENG-123. Do not notify watchers.",
expected_tool_calls=[
ExpectedToolCall(
func=add_labels_to_issue,
args={
"issue": "ENG-123",
"labels": ["Hello", "World"],
"notify_watchers": False,
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=1 / 3),
CaseInsensitiveListOfStringsBinaryCritic(critic_field="labels", weight=1 / 3),
BinaryCritic(critic_field="notify_watchers", weight=1 / 3),
],
)
suite.add_case(
name="Remove labels",
user_message="Remove the labels 'Hello' and 'World' from the issue ENG-123.",
expected_tool_calls=[
ExpectedToolCall(
func=remove_labels_from_issue,
args={
"issue": "ENG-123",
"labels": ["Hello", "World"],
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveListOfStringsBinaryCritic(critic_field="labels", weight=0.5),
],
)
suite.add_case(
name="Remove labels without notifying watchers",
user_message="Remove the labels 'Hello' and 'World' from the issue ENG-123. Do not notify watchers.",
expected_tool_calls=[
ExpectedToolCall(
func=remove_labels_from_issue,
args={
"issue": "ENG-123",
"labels": ["Hello", "World"],
"notify_watchers": False,
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=1 / 3),
CaseInsensitiveListOfStringsBinaryCritic(critic_field="labels", weight=1 / 3),
BinaryCritic(critic_field="notify_watchers", weight=1 / 3),
],
)
return suite
@tool_eval()
def update_issue_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Update issue eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Update issue with new assignee",
user_message="Change the assignee of the ENG-123 issue to John Doe.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"assignee": "John Doe",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="assignee", weight=0.5),
],
)
suite.add_case(
name="Update issue with new priority",
user_message="Set the priority of the ENG-123 issue to high.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"priority": "High",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="priority", weight=0.5),
],
)
suite.add_case(
name="Update issue with new due date",
user_message="Set the due date of the ENG-123 issue to 2025-06-30.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"due_date": "2025-06-30",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="due_date", weight=0.5),
],
)
suite.add_case(
name="Update issue with new labels",
user_message="Change the labels in the ENG-123 issue to 'Hello' and 'World'.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"labels": ["Hello", "World"],
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveListOfStringsBinaryCritic(critic_field="labels", weight=0.5),
],
)
suite.add_case(
name="Update issue with new title and description",
user_message="Change the title and description of the ENG-123 issue to 'Test issue' and 'This is a test issue'.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"title": "Test issue",
"description": "This is a test issue",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=1 / 3),
CaseInsensitiveBinaryCritic(critic_field="title", weight=1 / 3),
CaseInsensitiveBinaryCritic(critic_field="description", weight=1 / 3),
],
)
suite.add_case(
name="Clear due date",
user_message="Clear the due date of the issue ENG-123.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"due_date": "",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="due_date", weight=0.5),
],
)
suite.add_case(
name="Remove assignee",
user_message="Remove the assignee from the issue ENG-123.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"assignee": "",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="assignee", weight=0.5),
],
)
suite.add_case(
name="Remove assignee",
user_message="Remove the assignee from the issue ENG-123 without notifying anyone.",
expected_tool_calls=[
ExpectedToolCall(
func=update_issue,
args={
"issue": "ENG-123",
"assignee": "",
"notify_watchers": False,
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=1 / 3),
CaseInsensitiveBinaryCritic(critic_field="assignee", weight=1 / 3),
BinaryCritic(critic_field="notify_watchers", weight=1 / 3),
],
)
return suite

View file

@ -0,0 +1,437 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_jira
from arcade_jira.critics import CaseInsensitiveBinaryCritic, HasSubstringCritic
from arcade_jira.tools.issues import (
get_issue_by_id,
get_issues_without_id,
list_issues,
search_issues_with_jql,
)
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_jira)
@tool_eval()
def get_issue_by_id_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Get issue by ID eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get issue by ID",
user_message="Get the issue with ID '10000'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_issue_by_id,
args={
"issue_id": "10000",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="issue_id", weight=1.0),
],
)
suite.add_case(
name="Get issue by Key",
user_message="Get the issue ENG-103.",
expected_tool_calls=[
ExpectedToolCall(
func=get_issue_by_id,
args={
"issue_id": "ENG-103",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="issue_id", weight=1.0),
],
)
return suite
@tool_eval()
def get_issues_without_id_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Get issues without an ID",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks. "
"Today is 2025-05-27 (Tuesday)."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get issues by keywords",
user_message="Find the issue about implementing the message queue.",
expected_tool_calls=[
ExpectedToolCall(
func=get_issues_without_id,
args={
"keywords": "message queue",
},
),
],
rubric=rubric,
critics=[
HasSubstringCritic(critic_field="keywords", weight=1.0),
],
)
suite.add_case(
name="Get issues by due date",
user_message="Which issues are due this month?",
expected_tool_calls=[
ExpectedToolCall(
func=get_issues_without_id,
args={
"due_from": "2025-05-01",
"due_until": "2025-05-31",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="due_from", weight=0.5),
BinaryCritic(critic_field="due_until", weight=0.5),
],
)
suite.add_case(
name="Get issues by assignee, due date, status, priority and issue type",
user_message=(
"Find task issues assigned to John Doe that are in progress, "
"with high priority, and due until the end of this month"
),
expected_tool_calls=[
ExpectedToolCall(
func=get_issues_without_id,
args={
"assignee": "John Doe",
"due_from": None,
"due_until": "2025-05-31",
"status": "in progress",
"priority": "high",
"issue_type": "task",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="due_from", weight=0.1),
BinaryCritic(critic_field="due_until", weight=0.1),
CaseInsensitiveBinaryCritic(critic_field="assignee", weight=0.2),
CaseInsensitiveBinaryCritic(critic_field="status", weight=0.2),
CaseInsensitiveBinaryCritic(critic_field="priority", weight=0.2),
CaseInsensitiveBinaryCritic(critic_field="issue_type", weight=0.2),
],
)
suite.add_case(
name="Get issues by label and project name",
user_message="Find issues labeled with version 2 in the Engineering project",
expected_tool_calls=[
ExpectedToolCall(
func=get_issues_without_id,
args={
"project": "Engineering",
"labels": ["version 2"],
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="project", weight=0.5),
BinaryCritic(critic_field="labels", weight=0.5),
],
)
suite.add_case(
name="Get issues by parent issue",
user_message="Get the children issues of ENG-123",
expected_tool_calls=[
ExpectedToolCall(
func=get_issues_without_id,
args={
"parent_issue": "ENG-123",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="parent_issue", weight=1.0),
],
)
suite.add_case(
name="Paginate issues in multiple chat turns",
user_message="Get the next page of issues",
expected_tool_calls=[
ExpectedToolCall(
func=get_issues_without_id,
args={
"assignee": "john doe",
"priority": "high",
"status": "in progress",
"issue_type": "task",
"limit": 2,
"offset": 4,
"next_page_token": "1234567890",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="assignee", weight=1 / 7),
CaseInsensitiveBinaryCritic(critic_field="priority", weight=1 / 7),
CaseInsensitiveBinaryCritic(critic_field="status", weight=1 / 7),
CaseInsensitiveBinaryCritic(critic_field="issue_type", weight=1 / 7),
BinaryCritic(critic_field="limit", weight=1 / 7),
BinaryCritic(critic_field="offset", weight=1 / 7),
BinaryCritic(critic_field="next_page_token", weight=1 / 7),
],
additional_messages=[
{
"role": "user",
"content": "Find 2 tasks assigned to John Doe that are in progress, with high priority",
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Jira_GetIssuesWithoutId",
"arguments": json.dumps({
"assignee": "John Doe",
"priority": "high",
"status": "in progress",
"issue_type": "task",
"limit": 2,
"offset": 0,
}),
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"issues": [
{
"id": "10001",
"key": "ENG-101",
"summary": "Implement the message queue",
"assignee": {
"id": "10010",
"name": "John Doe",
"email": "john.doe@example.com",
},
"status": {
"id": "10020",
"name": "In Progress",
},
"priority": {
"id": "10030",
"name": "High",
},
"issue_type": {
"id": "10040",
"name": "Task",
},
"project": {
"id": "10050",
"key": "ENG",
"name": "Engineering",
},
},
{
"id": "10002",
"key": "ENG-102",
"summary": "Deploy the message queue system",
"assignee": {
"id": "10010",
"name": "John Doe",
"email": "john.doe@example.com",
},
"status": {
"id": "10020",
"name": "In Progress",
},
"priority": {
"id": "10030",
"name": "High",
},
"issue_type": {
"id": "10040",
"name": "Task",
},
"project": {
"id": "10050",
"key": "ENG",
"name": "Engineering",
},
},
],
"pagination": {
"limit": 2,
"total_results": 2,
"next_page_token": "1234567890",
},
}),
"tool_call_id": "call_1",
"name": "Jira_GetIssuesWithoutId",
},
{
"role": "assistant",
"content": "Here are two issues:\n\n1. ENG-101: Implement the message queue\n2. ENG-102: Deploy the message queue system",
},
],
)
return suite
@tool_eval()
def search_issues_with_jql_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Search issues with JQL",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks. "
"Today is 2025-05-27 (Tuesday)."
),
catalog=catalog,
rubric=rubric,
)
jql_query_str = 'text ~ "message queue" AND dueDate <= 2025-05-31'
suite.add_case(
name="Search issues by keywords",
user_message=f"Search for up to 10 issues using the JQL query: {jql_query_str}",
expected_tool_calls=[
ExpectedToolCall(
func=search_issues_with_jql,
args={
"jql": jql_query_str,
"limit": 10,
"offset": 0,
},
),
],
rubric=rubric,
critics=[
HasSubstringCritic(critic_field="jql", weight=1 / 3),
BinaryCritic(critic_field="limit", weight=1 / 3),
BinaryCritic(critic_field="offset", weight=1 / 3),
],
)
return suite
@tool_eval()
def list_issues_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="List issues eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get me any one issue in Jira",
user_message="Get me one issue in Jira.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issues,
args={
"limit": 1,
"next_page_token": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="next_page_token", weight=0.5),
],
)
suite.add_case(
name="List 10 issues in Jira",
user_message="List 10 issues in Jira.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issues,
args={
"limit": 10,
"next_page_token": None,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="limit", weight=0.5),
BinaryCritic(critic_field="next_page_token", weight=0.5),
],
)
suite.add_case(
name="List 10 issues in the Arcade project",
user_message="List 10 issues in the Arcade project.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issues,
args={
"project": "Arcade",
"limit": 50,
"next_page_token": None,
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="project", weight=1 / 3),
BinaryCritic(critic_field="limit", weight=1 / 3),
BinaryCritic(critic_field="next_page_token", weight=1 / 3),
],
)
return suite

View file

@ -0,0 +1,237 @@
import json
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
from arcade.sdk.eval.critic import BinaryCritic
import arcade_jira
from arcade_jira.tools.issues import (
get_issue_type_by_id,
list_issue_types_by_project,
)
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_jira)
@tool_eval()
def list_issue_types_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="List issue types eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="List issue types in a project (ID)",
user_message="List the issue types in the project with ID '1234567890'.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issue_types_by_project,
args={
"project": "1234567890",
"limit": 200,
"offset": 0,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="project", weight=0.8),
BinaryCritic(critic_field="limit", weight=0.1),
BinaryCritic(critic_field="offset", weight=0.1),
],
)
suite.add_case(
name="List issue types in a project (Key)",
user_message="List the issue types in the project PRJ-1.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issue_types_by_project,
args={
"project": "PRJ-1",
"limit": 200,
"offset": 0,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="project", weight=0.8),
BinaryCritic(critic_field="limit", weight=0.1),
BinaryCritic(critic_field="offset", weight=0.1),
],
)
suite.add_case(
name="List issue types in a project (name)",
user_message="List the issue types in the project 'Engineering'.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issue_types_by_project,
args={
"project": "Engineering",
"limit": 200,
"offset": 0,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="project", weight=0.8),
BinaryCritic(critic_field="limit", weight=0.1),
BinaryCritic(critic_field="offset", weight=0.1),
],
)
suite.add_case(
name="List issue types in a project (name) with pagination in the prompt",
user_message="List 10 issue types in the project 'Engineering'. Skip the first 30 items.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issue_types_by_project,
args={
"project": "Engineering",
"limit": 10,
"offset": 30,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="project", weight=1 / 3),
BinaryCritic(critic_field="limit", weight=1 / 3),
BinaryCritic(critic_field="offset", weight=1 / 3),
],
)
suite.add_case(
name="List issue types in a project (name) with pagination from chat history",
user_message="Get the next items.",
expected_tool_calls=[
ExpectedToolCall(
func=list_issue_types_by_project,
args={
"project": "Engineering",
"limit": 2,
"offset": 2,
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="project", weight=1 / 3),
BinaryCritic(critic_field="limit", weight=1 / 3),
BinaryCritic(critic_field="offset", weight=1 / 3),
],
additional_messages=[
{
"role": "user",
"content": "List 2 issue types in the project 'Engineering'.",
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "Jira_ListIssueTypesByProject",
"arguments": json.dumps({
"project": "Engineering",
"limit": 2,
"offset": 0,
}),
},
}
],
},
{
"role": "tool",
"content": json.dumps({
"project": {
"id": "10000",
"key": "Eng-1",
"name": "Engineering",
},
"issue_types": [
{
"id": "10001",
"name": "Bug",
"description": "A bug is an error or flaw in a software application or website.",
},
{
"id": "10002",
"name": "Task",
"description": "A task is a unit of work that needs to be completed.",
},
],
"pagination": {
"limit": 2,
"total_results": 2,
"next_offset": 2,
},
}),
"tool_call_id": "call_1",
"name": "Jira_ListIssueTypesByProject",
},
{
"role": "assistant",
"content": "Here are two issue types in the project 'Engineering':\n\n1. Bug\n2. Task",
},
],
)
return suite
@tool_eval()
def get_issue_type_by_id_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Get issue type by ID eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get issue type by ID",
user_message="Get the issue type with ID '10001'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_issue_type_by_id,
args={
"issue_type": "10001",
},
),
],
rubric=rubric,
critics=[
BinaryCritic(critic_field="project", weight=0.8),
BinaryCritic(critic_field="limit", weight=0.1),
BinaryCritic(critic_field="offset", weight=0.1),
],
)
return suite

View file

@ -0,0 +1,152 @@
from arcade.sdk import ToolCatalog
from arcade.sdk.eval import (
EvalRubric,
EvalSuite,
ExpectedToolCall,
tool_eval,
)
import arcade_jira
from arcade_jira.critics import (
CaseInsensitiveBinaryCritic,
)
from arcade_jira.tools.transitions import (
get_transition_by_status_name,
get_transitions_available_for_issue,
transition_issue_to_new_status,
)
# Evaluation rubric
rubric = EvalRubric(
fail_threshold=0.85,
warn_threshold=0.95,
)
catalog = ToolCatalog()
catalog.add_module(arcade_jira)
@tool_eval()
def transitions_eval_suite() -> EvalSuite:
suite = EvalSuite(
name="Transitions eval suite",
system_message=(
"You are an AI assistant with access to Jira tools. "
"Use them to help the user with their tasks."
),
catalog=catalog,
rubric=rubric,
)
suite.add_case(
name="Get transitions available for issue",
user_message="Get the transitions available for the issue ENG-123.",
expected_tool_calls=[
ExpectedToolCall(
func=get_transitions_available_for_issue,
args={
"issue": "ENG-123",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=1),
],
)
suite.add_case(
name="Can I transition an issue to status 'Done'?",
user_message="Can I transition the issue ENG-123 to the status 'Done'?",
expected_tool_calls=[
ExpectedToolCall(
func=get_transitions_available_for_issue,
args={
"issue": "ENG-123",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=1),
],
)
suite.add_case(
name="Get transition by status name",
user_message="Get the transition for the issue ENG-123 and status 'Done'.",
expected_tool_calls=[
ExpectedToolCall(
func=get_transition_by_status_name,
args={
"issue": "ENG-123",
"transition": "Done",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="transition", weight=0.5),
],
)
suite.add_case(
name="Transition issue to a new status",
user_message="Transition the issue ENG-123 to the status 'Done'.",
expected_tool_calls=[
ExpectedToolCall(
func=transition_issue_to_new_status,
args={
"issue": "ENG-123",
"transition": "Done",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="transition", weight=0.5),
],
)
suite.add_case(
name="Mark issue as done",
user_message="Mark the issue ENG-123 as done.",
expected_tool_calls=[
ExpectedToolCall(
func=transition_issue_to_new_status,
args={
"issue": "ENG-123",
"transition": "Done",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="transition", weight=0.5),
],
)
suite.add_case(
name="Update issue with new status",
user_message="Update the issue ENG-123 status to in progress.",
expected_tool_calls=[
ExpectedToolCall(
func=transition_issue_to_new_status,
args={
"issue": "ENG-123",
"transition": "in progress",
},
),
],
rubric=rubric,
critics=[
CaseInsensitiveBinaryCritic(critic_field="issue", weight=0.5),
CaseInsensitiveBinaryCritic(critic_field="transition", weight=0.5),
],
)
return suite

View file

@ -0,0 +1,42 @@
[tool.poetry]
name = "arcade_jira"
version = "0.1.0"
description = "Arcade tools designed for LLMs to interact with Atlassian Jira"
authors = ["Arcade <dev@arcade.dev>"]
[tool.poetry.dependencies]
python = "^3.10"
arcade-ai = ">=1.4.0,<2.0"
httpx = "^0.27.2"
[tool.poetry.dev-dependencies]
pytest = "^8.3.0"
pytest-cov = "^4.0.0"
pytest-asyncio = "^0.24.0"
pytest-mock = "^3.11.1"
mypy = "^1.5.1"
pre-commit = "^3.4.0"
tox = "^4.11.1"
ruff = "^0.7.4"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.mypy]
files = ["arcade_jira/**/*.py"]
python_version = "3.10"
disallow_untyped_defs = "True"
disallow_any_unimported = "True"
no_implicit_optional = "True"
check_untyped_defs = "True"
warn_return_any = "True"
warn_unused_ignores = "True"
show_error_codes = "True"
ignore_missing_imports = "True"
[tool.pytest.ini_options]
testpaths = ["tests"]
[tool.coverage.report]
skip_empty = true

View file

@ -0,0 +1,225 @@
from typing import Callable
import httpx
import pytest
from arcade.sdk import ToolContext
from arcade_jira.exceptions import NotFoundError
from arcade_jira.utils import clean_priority_dict, find_priorities_by_project
@pytest.mark.asyncio
async def test_find_priorities_by_project_with_no_priority_schemes(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
):
list_priority_schemes_response = mock_httpx_response(
200,
{
"values": [],
"isLast": True,
},
)
mock_httpx_client.get.return_value = list_priority_schemes_response
with pytest.raises(NotFoundError) as exc:
await find_priorities_by_project(mock_context, {})
assert "No priority schemes found" in exc.value.message
@pytest.mark.asyncio
async def test_find_priorities_by_project_when_project_does_not_exist(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_scheme_dict: Callable,
):
sample_project = build_project_dict()
priority_scheme = build_priority_scheme_dict()
list_priority_schemes_response = mock_httpx_response(
200,
{"values": [priority_scheme], "isLast": True},
)
find_project_by_id_response = mock_httpx_response(404, {})
search_projects_response = mock_httpx_response(200, {"values": [], "isLast": True})
mock_httpx_client.get.side_effect = [
list_priority_schemes_response,
find_project_by_id_response,
search_projects_response,
]
response = await find_priorities_by_project(mock_context, sample_project)
assert response == {"error": f"Project not found with name/key/ID '{sample_project['id']}'"}
@pytest.mark.asyncio
async def test_find_priorities_by_project_when_project_is_found_but_does_not_match_id(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_scheme_dict: Callable,
):
sample_project = build_project_dict()
other_project = build_project_dict(name=sample_project["name"])
priority_scheme = build_priority_scheme_dict()
list_priority_schemes_response = mock_httpx_response(
200,
{"values": [priority_scheme], "isLast": True},
)
find_project_by_id_response = mock_httpx_response(404, {})
search_projects_response = mock_httpx_response(
200,
{"values": [other_project], "isLast": True},
)
list_projects_response = mock_httpx_response(
200,
{"values": [other_project], "isLast": True},
)
mock_httpx_client.get.side_effect = [
list_priority_schemes_response,
find_project_by_id_response,
search_projects_response,
list_projects_response,
]
response = await find_priorities_by_project(mock_context, sample_project)
assert response == {
"error": f"No priority schemes found for the project {sample_project['id']}"
}
@pytest.mark.asyncio
async def test_find_priorities_by_project_happy_path(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_dict: Callable,
build_priority_scheme_dict: Callable,
):
sample_project = build_project_dict()
other_project = build_project_dict(name=sample_project["name"])
priority_scheme = build_priority_scheme_dict()
priority1 = build_priority_dict()
priority2 = build_priority_dict()
list_priority_schemes_response = mock_httpx_response(
200,
{"values": [priority_scheme], "isLast": True},
)
find_project_by_id_response = mock_httpx_response(404, {})
search_projects_response = mock_httpx_response(
200,
{"values": [sample_project], "isLast": True},
)
list_projects_response = mock_httpx_response(
200,
{"values": [sample_project, other_project], "isLast": True},
)
list_priorities_response = mock_httpx_response(
200,
{"values": [priority1, priority2], "isLast": True},
)
mock_httpx_client.get.side_effect = [
list_priority_schemes_response,
find_project_by_id_response,
search_projects_response,
list_projects_response,
list_priorities_response,
]
response = await find_priorities_by_project(mock_context, sample_project)
assert response["priorities_available"] == [
clean_priority_dict(priority1),
clean_priority_dict(priority2),
]
@pytest.mark.asyncio
async def test_find_priorities_by_project_happy_path_with_repeated_priorities_across_schemes(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_dict: Callable,
build_priority_scheme_dict: Callable,
):
sample_project = build_project_dict()
other_project = build_project_dict(name=sample_project["name"])
priority_scheme1 = build_priority_scheme_dict()
priority_scheme2 = build_priority_scheme_dict()
priority1 = build_priority_dict()
priority2 = build_priority_dict()
priority3 = build_priority_dict()
list_priority_schemes_response = mock_httpx_response(
200,
{"values": [priority_scheme1, priority_scheme2], "isLast": True},
)
find_project_by_id_response = mock_httpx_response(200, sample_project)
list_projects_by_priority_scheme_response1 = mock_httpx_response(
200,
{"values": [sample_project, other_project], "isLast": True},
)
list_projects_by_priority_scheme_response2 = mock_httpx_response(
200,
{"values": [sample_project], "isLast": True},
)
list_priorities_by_scheme_response1 = mock_httpx_response(
200,
{"values": [priority1, priority2], "isLast": True},
)
list_priorities_by_scheme_response2 = mock_httpx_response(
200,
{"values": [priority2, priority3], "isLast": True},
)
def get_httpx_response(url: str, *args, **kwargs) -> httpx.Response:
if url.endswith("/priorityscheme"):
return list_priority_schemes_response
elif url.endswith(f"/project/{sample_project['id']}"):
return find_project_by_id_response
elif url.endswith(f"/priorityscheme/{priority_scheme1['id']}/projects"):
return list_projects_by_priority_scheme_response1
elif url.endswith(f"/priorityscheme/{priority_scheme2['id']}/projects"):
return list_projects_by_priority_scheme_response2
elif url.endswith(f"/priorityscheme/{priority_scheme1['id']}/priorities"):
return list_priorities_by_scheme_response1
elif url.endswith(f"/priorityscheme/{priority_scheme2['id']}/priorities"):
return list_priorities_by_scheme_response2
else:
raise ValueError(f"Unexpected URL: {url}")
mock_httpx_client.get.side_effect = get_httpx_response
response = await find_priorities_by_project(mock_context, sample_project)
assert len(response["priorities_available"]) == 3
assert clean_priority_dict(priority1) in response["priorities_available"]
assert clean_priority_dict(priority2) in response["priorities_available"]
assert clean_priority_dict(priority3) in response["priorities_available"]

View file

@ -0,0 +1,235 @@
import json
from typing import Callable
import pytest
from arcade.sdk import ToolContext
from arcade_jira.exceptions import JiraToolExecutionError, MultipleItemsFoundError, NotFoundError
from arcade_jira.utils import clean_issue_type_dict, find_unique_issue_type
@pytest.mark.asyncio
async def test_find_unique_issue_type_by_id_success(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_issue_type_dict: Callable,
):
sample_issue_type = build_issue_type_dict()
issue_type_response = mock_httpx_response(200, sample_issue_type)
mock_httpx_client.get.return_value = issue_type_response
response = await find_unique_issue_type(mock_context, sample_issue_type["id"], "123")
assert response == clean_issue_type_dict(sample_issue_type)
@pytest.mark.asyncio
async def test_find_unique_issue_type_by_name_with_a_single_match(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_issue_type_dict: Callable,
build_project_dict: Callable,
build_issue_types_response_dict: Callable,
):
sample_project = build_project_dict()
sample_issue_type = build_issue_type_dict()
# It will first try to get the issue type by ID, we simulate a not found response
get_issue_type_by_id_response = mock_httpx_response(404, {})
# When it tries to get the issue type by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the issue types available to the project
list_issue_types_response = mock_httpx_response(
200, build_issue_types_response_dict([sample_issue_type], is_last=True)
)
mock_httpx_client.get.side_effect = [
get_issue_type_by_id_response,
get_project_by_id_response,
list_issue_types_response,
]
response = await find_unique_issue_type(
mock_context, sample_issue_type["name"].lower(), sample_project["id"]
)
assert response == clean_issue_type_dict(sample_issue_type)
@pytest.mark.asyncio
async def test_find_unique_issue_type_by_name_when_project_does_not_exist(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_issue_type_dict: Callable,
build_issue_types_response_dict: Callable,
build_project_search_response_dict: Callable,
):
sample_project = build_project_dict()
sample_issue_type = build_issue_type_dict()
# It will first try to get the issue type by ID
get_issue_type_by_id_response = mock_httpx_response(404, {})
# When it tries to get the project by id, we'll simulate a 404 error
get_project_by_id_response = mock_httpx_response(404, {})
# And also simulate no results found from search_projects
search_projects_response = mock_httpx_response(200, build_project_search_response_dict([]))
mock_httpx_client.get.side_effect = [
get_issue_type_by_id_response,
get_project_by_id_response,
search_projects_response,
]
with pytest.raises(JiraToolExecutionError) as exc:
await find_unique_issue_type(
mock_context, sample_issue_type["name"].lower(), sample_project["id"]
)
assert f"Project not found with name/key/ID '{sample_project['id']}'" in exc.value.message
@pytest.mark.asyncio
async def test_find_unique_issue_type_by_name_with_multiple_priorities_but_zero_matches(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_issue_type_dict: Callable,
build_issue_types_response_dict: Callable,
):
sample_project = build_project_dict()
sample_issue_type = build_issue_type_dict()
other_issue_type1 = build_issue_type_dict(name=sample_issue_type["name"] + "1")
other_issue_type2 = build_issue_type_dict(name=sample_issue_type["name"] + "2")
# It will first try to get the issue type by ID
get_issue_type_by_id_response = mock_httpx_response(404, {})
# When it tries to get the issue type by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the issue types available to the project
search_issue_types_response = mock_httpx_response(
200, build_issue_types_response_dict([other_issue_type1, other_issue_type2], is_last=True)
)
mock_httpx_client.get.side_effect = [
get_issue_type_by_id_response,
get_project_by_id_response,
search_issue_types_response,
]
with pytest.raises(NotFoundError) as exc:
await find_unique_issue_type(
mock_context, sample_issue_type["name"].lower(), sample_project["id"]
)
available_issue_types = json.dumps([
{
"id": other_issue_type1["id"],
"name": other_issue_type1["name"],
},
{
"id": other_issue_type2["id"],
"name": other_issue_type2["name"],
},
])
assert (
f"Issue type not found with ID or name '{sample_issue_type['name'].lower()}'. "
f"These are the issue types available for the project: {available_issue_types}"
== exc.value.message
)
@pytest.mark.asyncio
async def test_find_unique_issue_type_by_name_with_multiple_issue_types_but_one_match(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_issue_type_dict: Callable,
build_issue_types_response_dict: Callable,
):
sample_project = build_project_dict()
sample_issue_type = build_issue_type_dict()
other_issue_type1 = build_issue_type_dict(name=sample_issue_type["name"] + "1")
other_issue_type2 = build_issue_type_dict(name=sample_issue_type["name"] + "2")
# It will first try to get the issue type by ID
get_issue_type_by_id_response = mock_httpx_response(404, {})
# When it tries to get the priority by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the issue types available to the project
search_issue_types_response = mock_httpx_response(
200,
build_issue_types_response_dict(
[sample_issue_type, other_issue_type1, other_issue_type2],
is_last=True,
),
)
mock_httpx_client.get.side_effect = [
get_issue_type_by_id_response,
get_project_by_id_response,
search_issue_types_response,
]
response = await find_unique_issue_type(
mock_context, sample_issue_type["name"].lower(), sample_project["id"]
)
assert response == clean_issue_type_dict(sample_issue_type)
@pytest.mark.asyncio
async def test_find_unique_issue_type_by_name_with_multiple_issue_types_and_multiple_matches(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_issue_type_dict: Callable,
build_issue_types_response_dict: Callable,
):
sample_project = build_project_dict()
sample_issue_type = build_issue_type_dict()
other_issue_type1 = build_issue_type_dict(name=sample_issue_type["name"])
other_issue_type2 = build_issue_type_dict()
# It will first try to get the issue type by ID
get_issue_type_by_id_response = mock_httpx_response(404, {})
# When it tries to get the priority by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the issue types available to the project
search_issue_types_response = mock_httpx_response(
200,
build_issue_types_response_dict(
[sample_issue_type, other_issue_type1, other_issue_type2],
is_last=True,
),
)
mock_httpx_client.get.side_effect = [
get_issue_type_by_id_response,
get_project_by_id_response,
search_issue_types_response,
]
with pytest.raises(MultipleItemsFoundError) as exc:
await find_unique_issue_type(
mock_context, sample_issue_type["name"].lower(), sample_project["id"]
)
assert sample_issue_type["id"] in exc.value.message
assert other_issue_type1["id"] in exc.value.message
assert other_issue_type2["id"] not in exc.value.message

View file

@ -0,0 +1,227 @@
from typing import Callable
from unittest.mock import patch
import pytest
from arcade.sdk import ToolContext
from arcade_jira.exceptions import JiraToolExecutionError, MultipleItemsFoundError, NotFoundError
from arcade_jira.utils import clean_priority_dict, find_unique_priority
@pytest.mark.asyncio
async def test_find_unique_priority_by_id_success(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_priority_dict: Callable,
):
sample_priority = build_priority_dict()
priority_response = mock_httpx_response(200, sample_priority)
mock_httpx_client.get.return_value = priority_response
response = await find_unique_priority(mock_context, sample_priority["id"], "123")
assert response == clean_priority_dict(sample_priority)
@pytest.mark.asyncio
@patch("arcade_jira.tools.priorities.find_priorities_by_project")
async def test_find_unique_priority_by_name_with_a_single_match(
mock_find_priorities_by_project,
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_dict: Callable,
):
sample_project = build_project_dict()
sample_priority = build_priority_dict()
# It will first try to get the priority by ID
get_priority_by_id_response = mock_httpx_response(404, {})
# When it tries to get the priority by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the priorities available to the project
mock_find_priorities_by_project.return_value = {
"project": sample_project,
"priorities_available": [sample_priority],
}
mock_httpx_client.get.side_effect = [
get_priority_by_id_response,
get_project_by_id_response,
]
response = await find_unique_priority(
mock_context, sample_priority["name"].lower(), sample_project["id"]
)
assert response == clean_priority_dict(sample_priority)
@pytest.mark.asyncio
@patch("arcade_jira.tools.priorities.find_priorities_by_project")
async def test_find_unique_priority_by_name_when_project_does_not_exist(
mock_find_priorities_by_project,
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_dict: Callable,
build_project_search_response_dict: Callable,
):
sample_project = build_project_dict()
sample_priority = build_priority_dict()
# It will first try to get the priority by ID
get_priority_by_id_response = mock_httpx_response(404, {})
# When it tries to get the project by id, we'll simulate a 404 error
get_project_by_id_response = mock_httpx_response(404, {})
# And also simulate no results found from search_projects
search_projects_response = mock_httpx_response(200, build_project_search_response_dict([]))
# We'll still simulate a find_priorities_by_project response, but this should not be called
mock_find_priorities_by_project.return_value = {
"project": sample_project,
"priorities_available": [sample_priority],
}
mock_httpx_client.get.side_effect = [
get_priority_by_id_response,
get_project_by_id_response,
search_projects_response,
]
with pytest.raises(JiraToolExecutionError) as exc:
await find_unique_priority(
mock_context, sample_priority["name"].lower(), sample_project["id"]
)
mock_find_priorities_by_project.assert_not_called()
assert f"Project not found with name/key/ID '{sample_project['id']}'" in exc.value.message
@pytest.mark.asyncio
@patch("arcade_jira.tools.priorities.find_priorities_by_project")
async def test_find_unique_priority_by_name_with_multiple_priorities_but_zero_matches(
mock_find_priorities_by_project,
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_dict: Callable,
):
sample_project = build_project_dict()
sample_priority = build_priority_dict()
other_priority1 = build_priority_dict(name=sample_priority["name"] + "1")
other_priority2 = build_priority_dict(name=sample_priority["name"] + "2")
# It will first try to get the priority by ID
get_priority_by_id_response = mock_httpx_response(404, {})
# When it tries to get the priority by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the priorities available to the project
mock_find_priorities_by_project.return_value = {
"project": sample_project,
"priorities_available": [other_priority1, other_priority2],
}
mock_httpx_client.get.side_effect = [
get_priority_by_id_response,
get_project_by_id_response,
]
with pytest.raises(NotFoundError) as exc:
await find_unique_priority(
mock_context, sample_priority["name"].lower(), sample_project["id"]
)
assert (
f"Priority not found with ID or name '{sample_priority['name'].lower()}'"
== exc.value.message
)
@pytest.mark.asyncio
@patch("arcade_jira.tools.priorities.find_priorities_by_project")
async def test_find_unique_priority_by_name_with_multiple_priorities_but_one_match(
mock_find_priorities_by_project,
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_dict: Callable,
):
sample_project = build_project_dict()
sample_priority = build_priority_dict()
other_priority1 = build_priority_dict()
other_priority2 = build_priority_dict()
# It will first try to get the priority by ID
get_priority_by_id_response = mock_httpx_response(404, {})
# When it tries to get the priority by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the priorities available to the project
mock_find_priorities_by_project.return_value = {
"project": sample_project,
"priorities_available": [sample_priority, other_priority1, other_priority2],
}
mock_httpx_client.get.side_effect = [
get_priority_by_id_response,
get_project_by_id_response,
]
response = await find_unique_priority(
mock_context, sample_priority["name"].lower(), sample_project["id"]
)
assert response == clean_priority_dict(sample_priority)
@pytest.mark.asyncio
@patch("arcade_jira.tools.priorities.find_priorities_by_project")
async def test_find_unique_priority_by_name_with_multiple_priorities_and_multiple_matches(
mock_find_priorities_by_project,
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_priority_dict: Callable,
):
sample_project = build_project_dict()
sample_priority = build_priority_dict()
other_priority1 = build_priority_dict(name=sample_priority["name"])
other_priority2 = build_priority_dict()
# It will first try to get the priority by ID
get_priority_by_id_response = mock_httpx_response(404, {})
# When it tries to get the priority by name, it will first query the project data
get_project_by_id_response = mock_httpx_response(200, sample_project)
# Then it will query the priorities available to the project
mock_find_priorities_by_project.return_value = {
"project": sample_project,
"priorities_available": [sample_priority, other_priority1, other_priority2],
}
mock_httpx_client.get.side_effect = [
get_priority_by_id_response,
get_project_by_id_response,
]
with pytest.raises(MultipleItemsFoundError) as exc:
await find_unique_priority(
mock_context, sample_priority["name"].lower(), sample_project["id"]
)
assert sample_priority["id"] in exc.value.message
assert other_priority1["id"] in exc.value.message
assert other_priority2["id"] not in exc.value.message

View file

@ -0,0 +1,95 @@
from typing import Callable
import pytest
from arcade.sdk import ToolContext
from arcade_jira.exceptions import MultipleItemsFoundError, NotFoundError
from arcade_jira.utils import clean_project_dict, find_unique_project
@pytest.mark.asyncio
async def test_find_unique_project_by_id_success(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
fake_cloud_name: str,
):
sample_project = build_project_dict()
project_response = mock_httpx_response(200, sample_project)
mock_httpx_client.get.return_value = project_response
response = await find_unique_project(mock_context, sample_project["id"])
assert response == clean_project_dict(sample_project, fake_cloud_name)
@pytest.mark.asyncio
async def test_find_unique_project_by_name_with_a_single_match(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_project_search_response_dict: Callable,
fake_cloud_name: str,
):
sample_project = build_project_dict()
get_project_by_id_response = mock_httpx_response(404, {})
get_projects_without_id_response = mock_httpx_response(
200, build_project_search_response_dict([sample_project])
)
mock_httpx_client.get.side_effect = [
get_project_by_id_response,
get_projects_without_id_response,
]
response = await find_unique_project(mock_context, sample_project["name"].lower())
assert response == clean_project_dict(sample_project, fake_cloud_name)
@pytest.mark.asyncio
async def test_find_unique_project_by_name_with_multiple_matches(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_project_search_response_dict: Callable,
generate_random_str: Callable,
):
project_name = generate_random_str()
sample_projects = [
build_project_dict(name=project_name),
build_project_dict(name=project_name),
]
get_project_by_id_response = mock_httpx_response(404, {})
search_projects_response = mock_httpx_response(
200, build_project_search_response_dict(sample_projects)
)
mock_httpx_client.get.side_effect = [
get_project_by_id_response,
search_projects_response,
]
with pytest.raises(MultipleItemsFoundError) as exc:
await find_unique_project(mock_context, sample_projects[0]["name"].lower())
assert sample_projects[0]["id"] in exc.value.message
assert sample_projects[1]["id"] in exc.value.message
@pytest.mark.asyncio
async def test_find_unique_project_by_name_without_matches(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
generate_random_str: Callable,
build_project_search_response_dict: Callable,
):
get_project_by_id_response = mock_httpx_response(404, {})
search_projects_response = mock_httpx_response(200, build_project_search_response_dict([]))
mock_httpx_client.get.side_effect = [
get_project_by_id_response,
search_projects_response,
]
with pytest.raises(NotFoundError):
await find_unique_project(mock_context, generate_random_str())

View file

@ -0,0 +1,190 @@
from typing import Callable
import pytest
from arcade.sdk import ToolContext
from arcade_jira.exceptions import MultipleItemsFoundError, NotFoundError
from arcade_jira.utils import (
clean_user_dict,
find_multiple_unique_users,
find_unique_user,
)
@pytest.mark.asyncio
async def test_find_unique_user_by_id_success(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_user_dict: Callable,
fake_cloud_name: str,
):
sample_user = build_user_dict()
user_response = mock_httpx_response(200, sample_user)
mock_httpx_client.get.return_value = user_response
response = await find_unique_user(mock_context, sample_user["accountId"])
assert response == clean_user_dict(sample_user, fake_cloud_name)
@pytest.mark.asyncio
async def test_find_unique_user_by_name_with_a_single_match(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_user_dict: Callable,
fake_cloud_name: str,
):
sample_user = build_user_dict()
get_user_by_id_response = mock_httpx_response(404, {})
get_users_without_id_response = mock_httpx_response(200, [sample_user])
mock_httpx_client.get.side_effect = [get_user_by_id_response, get_users_without_id_response]
response = await find_unique_user(mock_context, sample_user["displayName"].lower())
assert response == clean_user_dict(sample_user, fake_cloud_name)
@pytest.mark.asyncio
async def test_find_unique_user_by_name_with_multiple_matches(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_user_dict: Callable,
generate_random_str: Callable,
):
user_name = generate_random_str()
sample_users = [
build_user_dict(display_name=user_name),
build_user_dict(display_name=user_name),
]
get_user_by_id_response = mock_httpx_response(404, {})
get_users_without_id_response = mock_httpx_response(200, sample_users)
mock_httpx_client.get.side_effect = [get_user_by_id_response, get_users_without_id_response]
with pytest.raises(MultipleItemsFoundError) as exc:
await find_unique_user(mock_context, sample_users[0]["displayName"].lower())
assert sample_users[0]["accountId"] in exc.value.message
assert sample_users[1]["accountId"] in exc.value.message
@pytest.mark.asyncio
async def test_find_unique_user_by_name_without_matches(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
generate_random_str: Callable,
):
get_user_by_id_response = mock_httpx_response(404, {})
get_users_without_id_response = mock_httpx_response(200, [])
mock_httpx_client.get.side_effect = [get_user_by_id_response, get_users_without_id_response]
with pytest.raises(NotFoundError):
await find_unique_user(mock_context, generate_random_str())
@pytest.mark.asyncio
async def test_find_multiple_users_when_all_names_match_one_result(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_user_dict: Callable,
fake_cloud_name: str,
):
user1 = build_user_dict()
user2 = build_user_dict()
mock_httpx_client.get.side_effect = [
mock_httpx_response(200, [user1]),
mock_httpx_response(200, [user2]),
]
response = await find_multiple_unique_users(
mock_context, [user1["displayName"], user2["displayName"]]
)
assert response == [
clean_user_dict(user1, fake_cloud_name),
clean_user_dict(user2, fake_cloud_name),
]
@pytest.mark.asyncio
async def test_find_multiple_users_when_a_name_match_multiple_results(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_user_dict: Callable,
):
user1 = build_user_dict()
user2 = build_user_dict()
user3 = build_user_dict()
mock_httpx_client.get.side_effect = [
mock_httpx_response(200, [user1]),
mock_httpx_response(200, [user2, user3]),
]
with pytest.raises(MultipleItemsFoundError) as exc:
await find_multiple_unique_users(mock_context, [user1["displayName"], user2["displayName"]])
assert user2["accountId"] in exc.value.message
assert user3["accountId"] in exc.value.message
@pytest.mark.asyncio
async def test_find_multiple_users_when_user_is_not_found_by_name_but_found_by_id(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_user_dict: Callable,
fake_cloud_name: str,
):
user1 = build_user_dict()
user2 = build_user_dict()
mock_httpx_client.get.side_effect = [
mock_httpx_response(200, [user1]),
mock_httpx_response(200, []),
mock_httpx_response(200, user2),
]
response = await find_multiple_unique_users(
mock_context, [user1["displayName"], user2["accountId"]]
)
assert response == [
clean_user_dict(user1, fake_cloud_name),
clean_user_dict(user2, fake_cloud_name),
]
@pytest.mark.asyncio
async def test_find_multiple_users_when_various_users_are_not_found_by_name_but_found_by_id(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_user_dict: Callable,
fake_cloud_name: str,
):
user1 = build_user_dict()
user2 = build_user_dict()
user3 = build_user_dict()
mock_httpx_client.get.side_effect = [
mock_httpx_response(200, [user1]),
mock_httpx_response(200, []),
mock_httpx_response(200, []),
mock_httpx_response(200, user2),
mock_httpx_response(200, user3),
]
response = await find_multiple_unique_users(
mock_context, [user1["displayName"], user2["accountId"], user3["accountId"]]
)
assert response == [
clean_user_dict(user1, fake_cloud_name),
clean_user_dict(user2, fake_cloud_name),
clean_user_dict(user3, fake_cloud_name),
]

View file

@ -0,0 +1,166 @@
from typing import Callable
import pytest
from arcade.sdk import ToolContext
from arcade_jira.exceptions import JiraToolExecutionError
from arcade_jira.tools.priorities import list_projects_associated_with_a_priority_scheme
from arcade_jira.utils import add_pagination_to_response, clean_project_dict, paginate_all_items
@pytest.mark.asyncio
async def test_paginate_all_items_with_zero_matches(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_search_response_dict: Callable,
):
response = mock_httpx_response(200, build_project_search_response_dict([], is_last=True))
mock_httpx_client.get.return_value = response
response = await paginate_all_items(
context=mock_context,
tool=list_projects_associated_with_a_priority_scheme,
response_items_key="projects",
scheme_id="123",
)
assert response == []
@pytest.mark.asyncio
async def test_paginate_all_items_with_one_page(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_project_search_response_dict: Callable,
fake_cloud_name: str,
):
projects = [build_project_dict(), build_project_dict()]
response = mock_httpx_response(200, build_project_search_response_dict(projects, is_last=True))
mock_httpx_client.get.return_value = response
response = await paginate_all_items(
context=mock_context,
tool=list_projects_associated_with_a_priority_scheme,
response_items_key="projects",
scheme_id="123",
)
assert response == [clean_project_dict(project, fake_cloud_name) for project in projects]
@pytest.mark.asyncio
async def test_paginate_all_items_with_multiple_pages(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_dict: Callable,
build_project_search_response_dict: Callable,
fake_cloud_name: str,
):
page1 = [build_project_dict(), build_project_dict()]
page2 = [build_project_dict(), build_project_dict()]
page3 = [build_project_dict()]
response1 = mock_httpx_response(200, build_project_search_response_dict(page1, is_last=False))
response2 = mock_httpx_response(200, build_project_search_response_dict(page2, is_last=False))
response3 = mock_httpx_response(200, build_project_search_response_dict(page3, is_last=True))
mock_httpx_client.get.side_effect = [response1, response2, response3]
response = await paginate_all_items(
context=mock_context,
tool=list_projects_associated_with_a_priority_scheme,
response_items_key="projects",
scheme_id="123",
limit=2,
)
assert response == [
clean_project_dict(project, fake_cloud_name) for project in page1 + page2 + page3
]
@pytest.mark.asyncio
async def test_paginate_all_items_when_tool_returns_error(
mock_context: ToolContext,
mock_httpx_client,
mock_httpx_response: Callable,
build_project_search_response_dict: Callable,
):
project_id = "456"
get_project_by_id_response = mock_httpx_response(404, {})
search_projects_response = mock_httpx_response(200, build_project_search_response_dict([]))
mock_httpx_client.get.side_effect = [get_project_by_id_response, search_projects_response]
with pytest.raises(JiraToolExecutionError) as exc:
await paginate_all_items(
context=mock_context,
tool=list_projects_associated_with_a_priority_scheme,
response_items_key="projects",
scheme_id="123",
project=project_id,
limit=2,
)
assert exc.value.message == f"Project not found with name/key/ID '{project_id}'"
def test_add_pagination_to_response_with_zero_items():
response = {"items": []}
items = []
limit = 10
offset = 0
add_pagination_to_response(response, items, limit, offset)
assert response["pagination"] == {"is_last_page": True, "limit": limit, "total_results": 0}
def test_add_pagination_to_response_with_last_page_false():
items = [{"id": "123"}, {"id": "456"}]
response = {"items": items, "isLast": False}
limit = 2
offset = 0
add_pagination_to_response(response, items, limit, offset)
assert response["pagination"] == {
"limit": limit,
"total_results": 2,
"next_offset": 2,
}
def test_add_pagination_to_response_with_last_page_true():
items = [{"id": "123"}, {"id": "456"}]
response = {"items": items, "isLast": True}
limit = 2
offset = 0
add_pagination_to_response(response, items, limit, offset)
assert response["pagination"] == {
"limit": limit,
"total_results": 2,
"is_last_page": True,
}
def test_add_pagination_to_response_without_last_page_and_limit_equal_items():
items = [{"id": "123"}, {"id": "456"}]
response = {"items": items}
limit = 2
offset = 0
add_pagination_to_response(response, items, limit, offset)
assert response["pagination"] == {
"limit": limit,
"total_results": 2,
"next_offset": 2,
}
def test_add_pagination_to_response_without_last_page_and_less_items_than_limit():
items = [{"id": "123"}]
response = {"items": items}
limit = 2
offset = 0
add_pagination_to_response(response, items, limit, offset)
assert response["pagination"] == {
"limit": limit,
"total_results": 1,
"is_last_page": True,
}