CrewAI Arcade (#253)

# CrewAI Integration
crewai-arcade enables you to add Arcade tools and Arcade Auth into your
CrewAI applications. Just create an `ArcadeToolManager` and add your
tools to your CrewAI Agent/Tasks.

## Initializing the ArcadeToolManager
There are two main ways to initialize your `ArcadeToolManager`
1. Default handling of tool authorization and execution:
    ```py
    """
    When you provide a user id to the ArcadeToolManger, 
    it will handle the tool authorization and tool execution for you
    """
manager = ArcadeToolManager(default_user_id="me@example.com,
api_key="...")
    ```
2. Custom handling of tool authorization and execution
    ```py
    """
    Provide a callback function to the `ArcadeToolManager` that handles 
tool authorization and tool execution. The callback function will be
called whenever your CrewAI
    application wants to call a tool.
    """
    
    
    def custom_tool_executor(
manager: ArcadeToolManager, tool_name: str, **tool_input: dict[str, Any]
    ) -> Any:
        """Custom tool executor for the ArcadeToolManager
    
ArcadeToolManager's default executor handles authorization and tool
execution.
This function overrides the default executor to handle authorization and
tool execution
        in a custom way.
        """
        # Your custom tool auth logic goes here
        # Your custom tool execution logic goes here
        ...
    
manager = ArcadeToolManager(executor=custom_tool_executor,
api_key="...")
    ```

## Tool Registration
1. Initialize the tools in the manager
    ```py
    """
Clears any existing tools in the manager and replaces them with tools
and toolkits that are provided.
    """
    manager.init_tools(tools=["Google.ListEmails"], toolkits=["Slack"])
    ```
2. Add tools to the manager
    ```py
    """
    Adds tools and toolkits to the manager's internal tool list.
    """
    manager.add_tools(tools=["Google.ListEmails"], toolkits=["Slack"])
    ```
3. Retrieve tools and toolkits from the manager
    ```py
    """
    Retrieves the provided tools and toolkits as CrewAI StructuredTools.
    """
    manager.get_tools(tools=["Google.ListEmails"], toolkits=["Slack"])
    ```
    
 ## Auth Helpers
The `ArcadeToolManager` provides multiple helper methods for when you
need to create
a custom auth flow.
1. `authorize_tool` handles the whole authorization flow for you. This
is used internally when a custom auth flow is not needed.
2. `requires_auth(tool_name)` checks if the provided tool has
authorization requirements.
3. `authorize(tool_name, user_id)` authorizes the use of the provided
tool for the provided user ID
4. `is_authorized(tool_name, user_id)` checks if a tool is authorized
for use by the provided user ID
5. `wait_for_auth(auth_response)` waits for an authorization process to
complete before returning

## Tool Execution Helpers
1. `execute_tool` handles the whole tool execution flow for you. This is
used internally when a custom tool execution flow is not needed.

---------

Co-authored-by: lgesuellip <lgesuellipinto@uade.edu.ar>
Co-authored-by: lpetralli <123559656+lpetralli@users.noreply.github.com>
Co-authored-by: lgesuellip <102637283+lgesuellip@users.noreply.github.com>
Co-authored-by: “lgesuellip” <“lgesuellipinto@uade.edu.ar”>
This commit is contained in:
Eric Gustin 2025-02-19 15:02:42 -08:00 committed by GitHub
parent 8efa9a51df
commit 1e0def78df
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1173 additions and 0 deletions

21
contrib/crewai/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025, Arcade AI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

62
contrib/crewai/Makefile Normal file
View file

@ -0,0 +1,62 @@
VERSION ?= "0.1.1"
.PHONY: install
install: ## Install the poetry environment and install the pre-commit hooks
@if ! command -v poetry >/dev/null 2>&1; then \
echo "🚫 Poetry is not installed. Please install poetry before proceeding."; \
exit 1; \
fi
@echo "🚀 Creating virtual environment using pyenv and poetry"
@poetry install --all-extras
@poetry run pre-commit install
.PHONY: check
check: ## Run code quality tools.
@echo "🚀 Checking Poetry lock file consistency with 'pyproject.toml': Running poetry check --lock"
@poetry check --lock
@echo "🚀 Linting code: Running pre-commit"
@poetry run pre-commit run -a
@echo "🚀 Static type checking: Running mypy"
@poetry run mypy $(git ls-files '*.py')
.PHONY: test
test: ## Test the code with pytest
@echo "🚀 Testing code: Running pytest"
@poetry run pytest -W ignore -v --cov --cov-config=pyproject.toml --cov-report=xml
.PHONY: set-version
set-version: ## Set the version in the pyproject.toml file
@echo "🚀 Setting version in pyproject.toml"
@poetry version $(VERSION)
.PHONY: unset-version
unset-version: ## Set the version in the pyproject.toml file
@echo "🚀 Setting version in pyproject.toml"
@poetry version 0.1.0
.PHONY: build
build: clean-build ## Build wheel file using poetry
@echo "🚀 Creating wheel file"
@poetry build
.PHONY: clean-build
clean-build: ## clean build artifacts
@rm -rf dist
.PHONY: publish
publish: ## publish a release to pypi.
@echo "🚀 Publishing: Dry run."
@poetry config pypi-token.pypi $(PYPI_TOKEN)
@poetry publish --dry-run
@echo "🚀 Publishing."
@poetry publish
.PHONY: build-and-publish
build-and-publish: build publish ## Build and publish.
.PHONY: help
help:
@echo "🛠️ Arcade AI Dev Commands:\n"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
.DEFAULT_GOAL := help

38
contrib/crewai/README.md Normal file
View file

@ -0,0 +1,38 @@
<h3 align="center">
<a name="readme-top"></a>
<img
src="https://docs.arcade.dev/images/logo/arcade-logo.png"
>
</h3>
<div align="center">
<h3>CrewAI Integration</h3>
<a href="https://github.com/arcadeai/arcade-ai/blob/main/LICENSE">
<img src="https://img.shields.io/badge/License-MIT-yellow.svg" alt="License">
</a>
<a href="https://pepy.tech/project/crewai-arcade">
<img src="https://static.pepy.tech/badge/crewai-arcade" alt="Downloads">
</a>
</div>
<p align="center">
<a href="https://docs.arcade.dev" target="_blank">Docs</a>
<a href="https://docs.arcade.dev/toolkits" target="_blank">Toolkits</a>
<a href="https://github.com/ArcadeAI/cookbook" target="_blank">Cookbook</a>
<a href="https://github.com/ArcadeAI/arcade-py" target="_blank">Python Client</a>
<a href="https://github.com/ArcadeAI/arcade-js" target="_blank">JavaScript Client</a>
</p>
## Overview
`crewai-arcade` allows you to use Arcade tools in your CrewAI applications.
## Installation
```bash
pip install crewai-arcade
```
## Usage
See the [examples](https://github.com/ArcadeAI/arcade-ai/tree/main/examples/crewai) for usage examples

View file

@ -0,0 +1,3 @@
from .manager import ArcadeToolManager
__all__ = ["ArcadeToolManager"]

View file

@ -0,0 +1,56 @@
from typing import Any
from arcadepy.types import ToolDefinition
from pydantic import BaseModel, Field, create_model
# Mapping of Arcade value types to Python types
TYPE_MAPPING = {
"string": str,
"number": float,
"integer": int,
"boolean": bool,
"array": list,
"json": dict,
}
def get_python_type(val_type: str) -> Any:
"""Map Arcade value types to Python types.
Args:
val_type: The value type as a string.
Returns:
Corresponding Python type.
"""
_type = TYPE_MAPPING.get(val_type)
if _type is None:
raise ValueError(f"Invalid value type: {val_type}")
return _type
def tool_definition_to_pydantic_model(tool_def: ToolDefinition) -> type[BaseModel]:
"""Convert a ToolDefinition's inputs into a Pydantic BaseModel.
Args:
tool_def: The ToolDefinition object to convert.
Returns:
A Pydantic BaseModel class representing the tool's input schema.
"""
try:
fields: dict[str, Any] = {}
for param in tool_def.input.parameters or []:
param_type = get_python_type(param.value_schema.val_type)
if param_type == list and param.value_schema.inner_val_type: # noqa: E721
inner_type: type[Any] = get_python_type(param.value_schema.inner_val_type)
param_type = list[inner_type] # type: ignore[valid-type]
param_description = param.description or "No description provided."
default = ... if param.required else None
fields[param.name] = (
param_type,
Field(default=default, description=param_description),
)
return create_model(f"{tool_def.name}Args", **fields)
except ValueError as e:
raise ValueError(f"Error converting {tool_def.name} parameters into pydantic model: {e}")

View file

@ -0,0 +1,363 @@
from collections.abc import Iterator
from typing import Any, Callable, Optional, Protocol
from arcadepy import Arcade
from arcadepy.types import ToolDefinition
from arcadepy.types.shared import AuthorizationResponse
from crewai_arcade._utilities import tool_definition_to_pydantic_model
from crewai_arcade.structured import StructuredTool
TOOL_NAME_SEPARATOR = "_"
class ArcadeToolExecutorProtocol(Protocol):
"""Protocol for Arcade tool executor callback."""
def __call__(
self,
manager: "ArcadeToolManager",
name: str,
**input: dict[str, Any], # noqa: A002
) -> Any: ...
class ArcadeToolManager:
"""Arcade tool manager for CrewAI
Wraps Arcade tools as CrewAI StructuredTools
"""
def __init__(
self,
client: Optional[Arcade] = None,
executor: Optional[ArcadeToolExecutorProtocol] = None,
*,
default_user_id: Optional[str] = None,
**kwargs: dict[str, Any],
) -> None:
"""Initialize the ArcadeToolManager.
Example:
>>> manager = ArcadeToolManager(default_user_id="me@example.com", api_key="...")
>>>
>>> # retrieve a specific Arcade tool as a CrewAI tool and add it to the manager
>>> manager.get_tools(tools=["Search.SearchGoogle"])
>>>
>>> # retrieve all Arcade tools in a toolkit as CrewAI tools and add them to the manager
>>> manager.get_tools(toolkits=["Search"])
>>>
>>> # retrieve all tools in the manager as CrewAI tools
>>> manager.get_tools()
>>>
>>> # clear and initialize new tools in the manager
>>> manager.init_tools(tools=["Search.SearchGoogle"], toolkits=["Search"])
Args:
client: Arcade client instance.
executor: Optional custom executor callback. Useful for customizing the authorization and execution flow.
default_user_id: The default user id used for tool authorization and execution
when no custom executor is provided.
**kwargs: Additional keyword arguments for the Arcade client if the client is not provided.
Note:
If no executor is provided, `default_user_id` must be specified so that the default
executor can call authorize and execute with that id.
"""
if not client:
api_key = kwargs.get("api_key")
base_url = kwargs.get("base_url")
arcade_kwargs = {"api_key": api_key, "base_url": base_url, **kwargs}
client = Arcade(**arcade_kwargs) # type: ignore[arg-type]
self._client = client
self._tools: dict[str, ToolDefinition] = {}
self.default_user_id = default_user_id
# Use the default executor if none is provided.
if executor is None and default_user_id is None:
raise ValueError("A default_user_id must be provided if no executor is specified.")
self.executor = executor or self._default_executor
if not callable(self.executor):
raise TypeError(
"executor must be callable and adhere to the ArcadeToolExecutorProtocol signature"
)
@property
def tools(self) -> list[str]:
return list(self._tools.keys())
def __iter__(self) -> Iterator[tuple[str, ToolDefinition]]:
yield from self._tools.items()
def __len__(self) -> int:
return len(self._tools)
def __getitem__(self, tool_name: str) -> ToolDefinition:
return self._tools[tool_name]
def init_tools(
self,
tools: Optional[list[str]] = None,
toolkits: Optional[list[str]] = None,
) -> None:
"""Initialize the tools in the manager.
This method clears any existing tools in the manager and replaces them
with tools and toolkits that are provided. If no tools or toolkits are
provided, then all tools in the Arcade client will be added.
Example:
>>> manager = ArcadeToolManager(default_user_id="me@example.com", api_key="...")
>>> manager.init_tools(tools=["Search.SearchGoogle"])
>>> manager.get_tools()
Args:
tools: Optional list of tool names to include.
toolkits: Optional list of toolkits to include.
"""
self._tools = self._retrieve_tool_definitions(tools, toolkits)
def add_tools(
self, tools: Optional[list[str]] = None, toolkits: Optional[list[str]] = None
) -> None:
"""Add tools to the manager.
This method adds tools to the manager's internal tool list. If no tools or
toolkits are provided, all tools in the Arcade client will be added.
Example:
>>> manager = ArcadeToolManager(default_user_id="me@example.com", api_key="...")
>>> manager.init_tools(tools=["Search.SearchGoogle"])
>>> manager.add_tools(tools=["Google.ListEmails"], toolkits=["Slack"])
>>> manager.get_tools()
Args:
tools: List of tool names to add.
toolkits: List of toolkits to add tools from.
"""
new_tool_definitions = self._retrieve_tool_definitions(tools, toolkits)
self._tools.update(new_tool_definitions)
def get_tools(
self, tools: Optional[list[str]] = None, toolkits: Optional[list[str]] = None
) -> list[StructuredTool]:
"""Retrieves the requested tools or toolkits from the manager.
This method retrieves the provided tools or toolkits as CrewAI StructuredTools.
If any provided tools or toolkits are not already present in the
internal tool list, then they are added to the manager.
If no tools or toolkits are provided, then all tools in the manager's
internal tool list are returned as CrewAI StructuredTools.
Example:
>>> manager = ArcadeToolManager(default_user_id="me@example.com", api_key="...")
>>>
>>> # Retrieve a specific tool as a CrewAI tool
>>> manager.get_tools(tools=["Search.SearchGoogle"])
>>>
>>> # Retrieve all tools in a toolkit as CrewAI tools
>>> manager.get_tools(toolkits=["Search"])
>>>
>>> # Retrieve all tools in the manager as CrewAI tools
>>> manager.get_tools()
Args:
tools: An optional list of tool names to retrieve and wrap. If any of these
tools are missing from the internal list, they will be added.
toolkits: An optional list of toolkits from which to retrieve and wrap tools.
Tools from these toolkits will be added if they are not already present.
Returns:
A list of StructuredTool instances adapted from the specified tools.
"""
if tools or toolkits:
if len(self) == 0:
self.init_tools(tools, toolkits)
else:
new_tools = self._retrieve_tool_definitions(tools, toolkits)
self._tools.update(new_tools)
# Wrap the requested tools as CrewAI StructuredTools
crewai_tools: list[StructuredTool] = []
for tool_name, tool_def in self:
crewai_tools.append(self._wrap_arcade_tool(tool_name, tool_def))
return crewai_tools
def authorize_tool(self, user_id: str, name: str) -> None:
"""Handle the authorization flow.
Args:
user_id: The user ID to authorize the tool for.
name: The name of the tool to authorize.
"""
if self.requires_auth(name):
# Get authorization status
auth_response = self.authorize(name, user_id)
if not self.is_authorized(auth_response.id): # type: ignore[arg-type]
# Handle authorization
print(f"Please use the following link to authorize: {auth_response.url}")
auth_response = self.wait_for_auth(auth_response)
# Ensure authorization completed successfully
if not self.is_authorized(auth_response.id): # type: ignore[arg-type]
raise ValueError(f"Authorization failed for {name}. URL: {auth_response.url}")
def execute_tool(self, user_id: str, name: str, **input: Any) -> Any: # noqa: A002
"""Handle the tool execution flow.
Args:
user_id: The user ID to execute the tool for.
name: The name of the tool to execute.
**input: Dictionary of input arguments for the tool.
Returns:
The output of the tool.
"""
response = self._client.tools.execute(
tool_name=name,
input=input,
user_id=user_id,
)
tool_error = response.output.error if response.output else None
if tool_error:
return str(tool_error)
if response.success:
return response.output.value # type: ignore[union-attr]
return "Failed to call " + name
def requires_auth(self, tool_name: str) -> bool:
"""Check if a tool requires authorization."""
cleaned_tool_name = tool_name.replace(".", TOOL_NAME_SEPARATOR)
tool_def = self._tools.get(cleaned_tool_name)
if tool_def is None:
raise ValueError(f"Tool '{tool_name}' not found in this ArcadeToolManager instance")
if tool_def.requirements is None:
return False
return tool_def.requirements.authorization is not None
def authorize(self, tool_name: str, user_id: str) -> AuthorizationResponse:
"""Authorize a user for a tool.
Args:
tool_name: The name of the tool to authorize.
user_id: The user ID to authorize.
Returns:
AuthorizationResponse
"""
return self._client.tools.authorize(tool_name=tool_name, user_id=user_id)
def is_authorized(self, authorization_id: str) -> bool:
"""Check if a tool authorization is complete."""
return self._client.auth.status(id=authorization_id).status == "completed"
def wait_for_auth(self, auth_response: AuthorizationResponse) -> AuthorizationResponse:
"""Wait for an authorization process to complete.
Args:
auth_response: The authorization response from the initial authorize call.
Returns:
AuthorizationResponse with completed status
"""
return self._client.auth.wait_for_completion(auth_response)
def _wrap_arcade_tool(self, name: str, tool_def: ToolDefinition) -> StructuredTool:
"""Wrap an Arcade tool as a CrewAI StructuredTool.
Args:
name: The name of the tool to wrap.
tool_def: The definition of the tool to wrap.
Returns:
A StructuredTool instance.
"""
description = tool_def.description or "No description provided."
args_schema = tool_definition_to_pydantic_model(tool_def)
tool_function = self._create_tool_function(name)
return StructuredTool.from_function(
func=tool_function,
name=name,
description=description,
args_schema=args_schema,
)
def _retrieve_tool_definitions(
self, tools: Optional[list[str]] = None, toolkits: Optional[list[str]] = None
) -> dict[str, ToolDefinition]:
"""Retrieve tool definitions from the Arcade client.
This method fetches tool definitions based on the provided tool names or toolkits.
If no specific tools or toolkits are provided, the method will fetch and return
all tools available in the Arcade client.
Args:
tools: Optional list of tool names to retrieve.
toolkits: Optional list of toolkits to retrieve tools from.
Returns:
A dictionary mapping full tool names to their corresponding ToolDefinition objects
"""
all_tools: list[ToolDefinition] = []
if tools:
single_tools = [self._client.tools.get(name=tool_id) for tool_id in tools]
all_tools.extend(single_tools)
if toolkits:
for tk in toolkits:
all_tools.extend(self._client.tools.list(toolkit=tk))
if not tools and not toolkits:
# Retrieve all Arcade tools.
page_iterator = self._client.tools.list()
all_tools.extend(page_iterator)
tool_definitions: dict[str, ToolDefinition] = {}
for tool in all_tools:
full_tool_name = f"{tool.toolkit.name}{TOOL_NAME_SEPARATOR}{tool.name}"
tool_definitions[full_tool_name] = tool
return tool_definitions
def _create_tool_function(self, tool_name: str) -> Callable[..., Any]:
"""Creates a function wrapper for an Arcade tool.
Args:
tool_name: The name of the tool to create a function for.
Returns:
A callable function that executes the tool.
"""
def tool_function(**kwargs: Any) -> Any:
return self.executor(self, tool_name, **kwargs)
return tool_function
@staticmethod
def _default_executor(
manager: "ArcadeToolManager", name: str, **tool_input: dict[str, Any]
) -> Any:
"""
Default executor that performs authorization followed
by tool execution using the manager's default_user_id.
"""
if manager.default_user_id is None:
raise ValueError("default_user_id is not set in ArcadeToolManager.")
user_id = manager.default_user_id
manager.authorize_tool(user_id, name)
return manager.execute_tool(user_id, name, **tool_input)

View file

View file

@ -0,0 +1,54 @@
from textwrap import dedent
from typing import Any, Callable, Optional
from crewai.tools.base_tool import BaseTool
from pydantic import BaseModel as PydanticBaseModel
class StructuredTool(BaseTool): # type: ignore[no-any-unimported]
"""A tool that executes functions with structured inputs using a schema."""
func: Callable[..., Any]
"""The callable function that implements the tool's functionality."""
def _run(self, *args: Any, **kwargs: Any) -> Any:
"""Execute the tool's function with the provided arguments."""
return self.func(*args, **kwargs)
@classmethod
def from_function(
cls,
func: Callable,
args_schema: type[PydanticBaseModel],
name: Optional[str] = None,
description: Optional[str] = None,
**kwargs: Any,
) -> "StructuredTool":
"""Create a new StructuredTool instance from a function.
Args:
func: Function to wrap as a CrewAI tool
name: Custom name for the tool
description: Custom description of the tool's functionality
args_schema: Pydantic model defining the expected argument structure
**kwargs: Additional tool configuration parameters, like cache_function
Returns:
StructuredTool: A new tool instance wrapping the provided function
"""
name = name or func.__name__
if description is None:
description = func.__doc__
if description is None:
raise ValueError("Function must have a docstring if description not provided.")
# Clean up docstring
description = dedent(description).strip()
return cls(
func=func,
args_schema=args_schema,
name=name,
description=description,
**kwargs,
)

View file

@ -0,0 +1,45 @@
[tool.poetry]
name = "crewai-arcade"
version = "0.1.1"
description = "An integration package connecting Arcade and CrewAI"
authors = ["Arcade <dev@arcade.dev>"]
readme = "README.md"
repository = "https://github.com/arcadeai/arcade-ai/tree/main/contrib/crewai"
license = "MIT"
[tool.poetry.dependencies]
python = ">=3.10,<3.13"
crewai = ">=0.1.0,<1.0.0"
pydantic = "^2.0.0"
arcadepy = "^1.0.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.1.2"
pytest-cov = "^4.0.0"
mypy = "^1.5.1"
pre-commit = "^3.4.0"
tox = "^4.11.1"
[tool.mypy]
files = ["crewai_arcade"]
python_version = "3.10"
disallow_untyped_defs = "True"
disallow_any_unimported = "True"
no_implicit_optional = "True"
check_untyped_defs = "True"
warn_return_any = "True"
warn_unused_ignores = "True"
show_error_codes = "True"
ignore_missing_imports = "True"
[tool.pytest.ini_options]
testpaths = ["tests"]
[tool.coverage.run]
branch = true
source = ["crewai_arcade"]
[tool.coverage.report]
skip_empty = true

View file

View file

@ -0,0 +1,258 @@
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from arcadepy.types import ToolDefinition
from crewai_arcade.manager import TOOL_NAME_SEPARATOR, ArcadeToolManager
# --- Custom executor ---
def custom_executor(manager: ArcadeToolManager, name: str, **tool_input: dict[str, Any]) -> Any:
"""Custom executor for testing purposes."""
return "Tool executed"
# --- Fixtures ---
@pytest.fixture
def mock_client():
"""Create a fake Arcade client fixture."""
return MagicMock()
@pytest.fixture
def manager_with_default_executor(mock_client):
"""Return an ArcadeToolManager with a test default_user_id and fake client."""
return ArcadeToolManager(default_user_id="test_user", client=mock_client)
@pytest.fixture
def manager_with_custom_executor(mock_client):
"""Return an ArcadeToolManager with a test default_user_id and fake client using a custom executor."""
return ArcadeToolManager(
default_user_id="test_user", client=mock_client, executor=custom_executor
)
@pytest.fixture
def fake_tool_definition():
"""Return a fake tool definition for testing purposes."""
fake_tool = MagicMock(spec=ToolDefinition)
fake_tool.name = "SearchGoogle"
fake_tool.description = "Test tool description"
fake_tool.toolkit = MagicMock()
fake_tool.toolkit.name = "Search"
fake_tool.requirements = None
fake_tool.input = MagicMock()
fake_tool.input.parameters = []
return fake_tool
# --- Tests for _create_tool_function ---
def test_create_tool_function_success_custom(manager_with_custom_executor):
"""
Test that the tool function executes successfully using the custom executor.
The custom executor simply returns "Tool executed".
"""
tool_function = manager_with_custom_executor._create_tool_function("test_tool")
result = tool_function()
assert result == "Tool executed"
def test_create_tool_function_forwards_kwargs_custom(manager_with_custom_executor):
"""
Test that extra keyword arguments are forwarded correctly to the custom executor.
The custom executor ignores the kwargs and returns "Tool executed".
"""
tool_function = manager_with_custom_executor._create_tool_function("test_tool")
result = tool_function(param1="value1", param2=2)
assert result == "Tool executed"
def test_create_tool_function_unauthorized(manager_with_default_executor):
"""
Test that the tool function raises a ValueError when authorization fails using the default executor.
"""
# Mock an authorization failure by having authorize_tool raise ValueError.
manager_with_default_executor.authorize_tool = MagicMock(
side_effect=ValueError("Authorization failed for test_tool")
)
manager_with_default_executor.execute_tool = MagicMock()
tool_function = manager_with_default_executor._create_tool_function("test_tool")
with pytest.raises(ValueError, match="Authorization failed for test_tool"):
tool_function()
manager_with_default_executor.authorize_tool.assert_called_once_with("test_user", "test_tool")
manager_with_default_executor.execute_tool.assert_not_called() # auth fails before this is called
def test_create_tool_function_execution_failure(manager_with_default_executor):
"""
Test that when tool execution returns a failing value, that value is returned.
"""
manager_with_default_executor.authorize_tool = MagicMock() # auth passes
manager_with_default_executor.execute_tool = MagicMock(return_value="error")
tool_function = manager_with_default_executor._create_tool_function("test_tool")
result = tool_function()
assert result == "error"
manager_with_default_executor.authorize_tool.assert_called_once_with("test_user", "test_tool")
manager_with_default_executor.execute_tool.assert_called_once_with("test_user", "test_tool")
# --- Test for _wrap_arcade_tool ---
def test_wrap_arcade_tool(manager_with_default_executor, fake_tool_definition):
"""
Test that _wrap_arcade_tool correctly creates a StructuredTool.
"""
fake_tool_definition.description = "Test tool"
tool_name = "test_tool"
# Patch the conversion utilities. Also, override _create_tool_function to return a dummy function.
with (
patch(
"crewai_arcade.manager.tool_definition_to_pydantic_model", return_value="args_schema"
) as mock_to_model,
patch(
"crewai_arcade.structured.StructuredTool.from_function", return_value="structured_tool"
) as mock_from_function,
patch.object(
manager_with_default_executor,
"_create_tool_function",
return_value=lambda *a, **kw: None,
) as mock_create_tool,
):
result = manager_with_default_executor._wrap_arcade_tool(tool_name, fake_tool_definition)
assert result == "structured_tool"
mock_to_model.assert_called_once_with(fake_tool_definition)
mock_from_function.assert_called_once_with(
func=mock_create_tool.return_value,
name=tool_name,
description="Test tool",
args_schema="args_schema",
)
# --- Tests for tool registration (init_tools, add_tools, get_tools) ---
def test_init_tools_with_tool(manager_with_default_executor, fake_tool_definition):
"""
Test that init_tools clears and initializes the tools in the manager using tool names.
"""
manager_with_default_executor._client.tools.get.return_value = fake_tool_definition
manager_with_default_executor.init_tools(tools=["Search.SearchGoogle"])
expected_key = (
f"{fake_tool_definition.toolkit.name}{TOOL_NAME_SEPARATOR}{fake_tool_definition.name}"
)
assert expected_key in manager_with_default_executor._tools
assert len(manager_with_default_executor._tools) == 1
def test_init_tools_with_toolkit(manager_with_default_executor, fake_tool_definition):
"""
Test that init_tools correctly fetches tools using a toolkit.
"""
# Simulate that listing a toolkit returns a list with the fake tool
manager_with_default_executor._client.tools.list.return_value = [fake_tool_definition]
manager_with_default_executor.init_tools(toolkits=["Search"])
expected_key = (
f"{fake_tool_definition.toolkit.name}{TOOL_NAME_SEPARATOR}{fake_tool_definition.name}"
)
assert expected_key in manager_with_default_executor._tools
assert len(manager_with_default_executor._tools) == 1
def test_init_tools_with_none(manager_with_default_executor, fake_tool_definition):
"""
Test that init_tools with no arguments retrieves all tools.
"""
manager_with_default_executor._client.tools.list.return_value = [fake_tool_definition]
manager_with_default_executor.init_tools()
expected_key = (
f"{fake_tool_definition.toolkit.name}{TOOL_NAME_SEPARATOR}{fake_tool_definition.name}"
)
assert expected_key in manager_with_default_executor._tools
assert len(manager_with_default_executor._tools) == 1
def test_add_tools(manager_with_default_executor, fake_tool_definition):
"""
Test that add_tools supplements the manager's existing tool dictionary.
"""
# Set an initial tool in _tools.
fake_initial_tool = MagicMock(spec=ToolDefinition)
fake_initial_tool.name = "InitialTool"
fake_initial_tool.toolkit = MagicMock()
fake_initial_tool.toolkit.name = "InitialToolkit"
initial_key = f"{fake_initial_tool.toolkit.name}{TOOL_NAME_SEPARATOR}{fake_initial_tool.name}"
manager_with_default_executor._tools[initial_key] = fake_initial_tool
# Mock retrieval of a new tool.
manager_with_default_executor._client.tools.get.return_value = fake_tool_definition
manager_with_default_executor.add_tools(tools=["Search.SearchGoogle"])
new_key = f"{fake_tool_definition.toolkit.name}{TOOL_NAME_SEPARATOR}{fake_tool_definition.name}"
assert initial_key in manager_with_default_executor._tools
assert new_key in manager_with_default_executor._tools
def test_get_tools_with_existing_tools(manager_with_default_executor, fake_tool_definition):
"""
Test that get_tools wraps existing tools if they are already registered.
"""
manager_with_default_executor._client.tools.get.return_value = fake_tool_definition
manager_with_default_executor.init_tools(tools=["Search.SearchGoogle"])
expected_key = (
f"{fake_tool_definition.toolkit.name}{TOOL_NAME_SEPARATOR}{fake_tool_definition.name}"
)
# Patch _wrap_arcade_tool to verify that it is called.
with patch.object(
manager_with_default_executor, "_wrap_arcade_tool", side_effect=lambda name, td: (name, td)
) as mock_wrap:
crewai_tools = manager_with_default_executor.get_tools()
assert len(crewai_tools) == 1
assert crewai_tools[0] == (expected_key, fake_tool_definition)
mock_wrap.assert_called_once_with(expected_key, fake_tool_definition)
def test_get_tools_with_missing_tool_and_toolkit(
manager_with_default_executor, fake_tool_definition
):
"""
Test that get_tools adds missing tools and toolkits when not already registered.
"""
manager_with_default_executor._tools = {}
manager_with_default_executor._client.tools.get.return_value = fake_tool_definition
manager_with_default_executor._client.tools.list.return_value = [fake_tool_definition]
with patch.object(
manager_with_default_executor, "_wrap_arcade_tool", side_effect=lambda name, td: (name, td)
) as mock_wrap:
crewai_tools = manager_with_default_executor.get_tools(
tools=["Search.SearchGoogle"], toolkits=["Search"]
)
expected_key = (
f"{fake_tool_definition.toolkit.name}{TOOL_NAME_SEPARATOR}{fake_tool_definition.name}"
)
assert expected_key in manager_with_default_executor._tools
assert len(crewai_tools) == 1
assert crewai_tools[0] == (expected_key, fake_tool_definition)
mock_wrap.assert_called_once_with(expected_key, fake_tool_definition)

View file

@ -0,0 +1,92 @@
import pytest
from crewai import Agent, Crew, Task
from crewai_arcade.structured import StructuredTool
from pydantic import BaseModel
class CalculatorInput(BaseModel):
x: float
y: float
def add_numbers(x: float, y: float) -> float:
"""Add two numbers together."""
return x + y
def unnamed_function(x: float, y: float) -> float:
return x + y
def test_structured_tool_basic():
# Test basic functionality with explicit name and description
calculator_tool = StructuredTool.from_function(
func=add_numbers,
args_schema=CalculatorInput,
name="Calculator",
description="A tool that adds two numbers together",
)
expected_description = (
"Tool Name: Calculator\n"
"Tool Arguments: {'x': {'description': None, 'type': 'float'}, 'y': {'description': None, 'type': 'float'}}\n"
"Tool Description: A tool that adds two numbers together"
)
assert calculator_tool.description == expected_description
assert calculator_tool.func(2, 3) == 5
def test_structured_tool_auto_name_description():
# Test automatic name and description generation from function
calculator_tool = StructuredTool.from_function(func=add_numbers, args_schema=CalculatorInput)
expected_description = (
"Tool Name: add_numbers\n"
"Tool Arguments: {'x': {'description': None, 'type': 'float'}, 'y': {'description': None, 'type': 'float'}}\n"
"Tool Description: Add two numbers together."
)
assert calculator_tool.description == expected_description
def test_structured_tool_validation_errors():
# Test missing docstring
with pytest.raises(
ValueError, match="Function must have a docstring if description not provided."
):
StructuredTool.from_function(func=unnamed_function, args_schema=CalculatorInput)
def test_structured_tool_in_crew():
calculator_tool = StructuredTool.from_function(
func=add_numbers,
args_schema=CalculatorInput,
)
calculator_agent = Agent(
role="Math Expert",
goal="Perform mathematical calculations accurately",
backstory="An expert mathematician who specializes in calculations",
tools=[calculator_tool],
verbose=True,
cache=False,
)
addition_task = Task(
description="Add the numbers 5 and 3 together",
expected_output="The sum of 5 and 3 is 8",
agent=calculator_agent,
)
crew = Crew(
agents=[calculator_agent],
tasks=[addition_task],
)
# Assert crew structure
assert len(crew.agents) == 1
assert len(crew.tasks) == 1
assert crew.agents[0] == calculator_agent
assert crew.tasks[0] == addition_task
assert crew.tasks[0].agent == calculator_agent
assert len(crew.agents[0].tools) == 1
assert isinstance(crew.agents[0].tools[0], StructuredTool)

View file

@ -0,0 +1,134 @@
"""
This is an example of how to use Arcade with CrewAI.
The ArcadeToolManager allows you to handle both authorization and tool execution in a custom way.
This example demonstrates how to implement a custom auth handler and a custom tool execute handler.
The example assumes the following:
1. You have an Arcade API key and have set the ARCADE_API_KEY environment variable.
2. You have an OpenAI API key and have set the OPENAI_API_KEY environment variable.
3. You have installed the necessary dependencies in the requirements.txt file: `pip install -r requirements.txt`
"""
from typing import Any
from crewai import Agent, Crew, Task
from crewai.crews import CrewOutput
from crewai.llm import LLM
from crewai_arcade import ArcadeToolManager
USER_ID = "user@example.com"
def custom_auth_flow(
manager: ArcadeToolManager, tool_name: str, **tool_input: dict[str, Any]
) -> Any:
"""Custom auth flow for the ArcadeToolManager
This function is called when CrewAI needs to call a tool that requires authorization.
Authorization is handled before executing the tool.
This function overrides the ArcadeToolManager's default auth flow performed by ArcadeToolManager.authorize_tool
"""
print(f"Authorization required for tool: '{tool_name}' with inputs:")
for input_name, input_value in tool_input.items():
print(f" {input_name}: {input_value}")
# Get authorization status
auth_response = manager.authorize(tool_name, USER_ID)
# If the user is not authorized for the tool,
# then we need to handle the authorization before executing the tool
if not manager.is_authorized(auth_response.id):
# Handle authorization
print(f"\nTo authorize, visit: {auth_response.url}")
# Block until the user has completed the authorization
auth_response = manager.wait_for_auth(auth_response)
# Ensure authorization completed successfully
if not manager.is_authorized(auth_response.id):
raise ValueError(f"Authorization failed for {tool_name}. URL: {auth_response.url}")
def custom_execute_flow(
manager: ArcadeToolManager, tool_name: str, **tool_input: dict[str, Any]
) -> Any:
"""Custom tool execution flow for the ArcadeToolManager
This function is called when CrewAI needs to execute a tool after any authorization has been handled.
This function overrides the ArcadeToolManager's default tool execution flow performed by ArcadeToolManager.execute_tool
"""
print(f"Executing tool: '{tool_name}' with inputs:")
for input_name, input_value in tool_input.items():
print(f" {input_name}: {input_value}")
# Execute the tool
response = manager._client.tools.execute(
tool_name=tool_name,
input=tool_input,
user_id=USER_ID,
)
# Handle the tool error if it exists
tool_error = response.output.error if response.output else None
if tool_error:
return str(tool_error)
# Return the tool output if the tool was executed successfully
if response.success:
return response.output.value # type: ignore[union-attr]
# Return a failure message if the tool was not executed successfully
return "Failed to call " + tool_name
def custom_tool_executor(
manager: ArcadeToolManager, tool_name: str, **tool_input: dict[str, Any]
) -> Any:
"""Custom tool executor for the ArcadeToolManager
ArcadeToolManager's default executor handles authorization and tool execution.
This function overrides the default executor to handle authorization and tool execution in a custom way.
"""
custom_auth_flow(manager, tool_name, **tool_input)
return custom_execute_flow(manager, tool_name, **tool_input)
def main() -> CrewOutput:
manager = ArcadeToolManager(
executor=custom_tool_executor,
)
tools = manager.get_tools(tools=["Google.ListEmails"])
crew_agent = Agent(
role="Main Agent",
backstory="You are a helpful assistant",
goal="Help the user with their requests",
tools=tools,
allow_delegation=False,
verbose=True,
llm=LLM(model="gpt-4o"),
)
task = Task(
description="Get the 5 most recent emails from the user's inbox and summarize them and recommend a response for each.",
expected_output="A bulleted list with a one sentence summary of each email and a recommended response to the email.",
agent=crew_agent,
tools=crew_agent.tools,
)
crew = Crew(
agents=[crew_agent],
tasks=[task],
verbose=True,
memory=True,
)
result = crew.kickoff()
return result
if __name__ == "__main__":
result = main()
print("\n\n\n ------------ Result ------------ \n\n\n")
print(result)

View file

@ -0,0 +1 @@
crewai-arcade>=0.1.0

View file

@ -0,0 +1,46 @@
"""
This is a simple example of how to use Arcade with CrewAI.
The example authenticates into the user's Gmail account, retrieves their 5 most recent emails, and summarizes them.
The example assumes the following:
1. You have an Arcade API key and have set the ARCADE_API_KEY environment variable.
2. You have an OpenAI API key and have set the OPENAI_API_KEY environment variable.
3. You have installed the necessary dependencies in the requirements.txt file: `pip install -r requirements.txt`
"""
from crewai import Agent, Crew, Task
from crewai.llm import LLM
from crewai_arcade import ArcadeToolManager
manager = ArcadeToolManager(default_user_id="user@example.com")
tools = manager.get_tools(tools=["Google.ListEmails"])
crew_agent = Agent(
role="Main Agent",
backstory="You are a helpful assistant",
goal="Help the user with their requests",
tools=tools,
allow_delegation=False,
verbose=True,
llm=LLM(model="gpt-4o"),
)
task = Task(
description="Get the 5 most recent emails from the user's inbox and summarize them and recommend a response for each.",
expected_output="A bulleted list with a one sentence summary of each email and a recommended response to the email.",
agent=crew_agent,
tools=crew_agent.tools,
)
crew = Crew(
agents=[crew_agent],
tasks=[task],
verbose=True,
memory=True,
)
result = crew.kickoff()
print("\n\n\n ------------ Result ------------ \n\n\n")
print(result)