Langchain arcade 1.2 (#282)

- **New Class Structure**: Introduced `ToolManager` and
`AsyncToolManager` classes (`ArcadeToolManager` is deprecated)
- **Async Support**: Full async implementation for modern LangChain
applications
- **Better Tool Management**: New methods for adding individual tools
and toolkits
- **CI/CD**: for langchain_arcade


## Upgrade Changes

```python
# Old pattern
manager = ArcadeToolManager(api_key="...")
tools = manager.get_tools(toolkits=["Google"])

# New pattern
manager = ToolManager(api_key="...")
manager.init_tools(toolkits=["Google"])
tools = manager.to_langchain()
```

Now supports underscores vs dots in tool names for better model
compatibility.
This commit is contained in:
Sam Partee 2025-03-10 18:52:06 -07:00 committed by GitHub
parent 4164f796b0
commit 140f4eca17
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1693 additions and 290 deletions

View file

@ -64,15 +64,3 @@ jobs:
if: ${{ matrix.python-version == '3.10' }}
with:
token: ${{ secrets.CODECOV_TOKEN }}
# docs don't work yet.
# check-docs:
# runs-on: ubuntu-latest
# steps:
# - name: Check out
# uses: actions/checkout@v4
#
# - name: Set up the environment
# uses: ./.github/actions/setup-poetry-env
#
# - name: Check if documentation can be built
# run: poetry run mkdocs build -s

74
.github/workflows/publish-langchain.yml vendored Normal file
View file

@ -0,0 +1,74 @@
name: Publish LangChain Arcade
on:
workflow_dispatch:
inputs:
version:
description: "Version to release (leave empty to use version from pyproject.toml)"
required: false
jobs:
test-and-publish:
runs-on: ubuntu-latest
permissions:
contents: write
packages: write
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5
- uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
- name: Test LangChain Arcade
working-directory: contrib/langchain
run: |
make install
make check
make test
- name: Set version if provided
if: inputs.version != ''
working-directory: contrib/langchain
run: |
poetry version ${{ inputs.version }}
- name: Publish to PyPI
working-directory: contrib/langchain
run: |
poetry build
# Extract version from pyproject.toml using poetry and save it
VERSION=$(poetry version -s)
echo "VERSION=$VERSION" >> $GITHUB_ENV
poetry config pypi-token.pypi ${{ secrets.PYPI_TOKEN }}
# Attempt to publish the toolkit to PyPI. Skip if the version already exists
if poetry publish --skip-existing 2>&1 | grep -q "File exists. Skipping"; then
echo "Version already exists on PyPI. Skipping publish."
echo "skip_publish=true" >> $GITHUB_OUTPUT
else
echo "skip_publish=false" >> $GITHUB_OUTPUT
fi
- name: Send status to Slack
if: steps.Publish_LangChain.outputs.skip_publish != 'true'
uses: slackapi/slack-github-action@v2.0.0
with:
webhook: ${{ secrets.SLACK_WEBHOOK_URL }}
webhook-type: webhook-trigger
payload: |
{
"status": "${{ job.status }}",
"project": "langchain_arcade",
"version": "${{ env.VERSION }}",
"url": "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
}

66
.github/workflows/test-langchain.yml vendored Normal file
View file

@ -0,0 +1,66 @@
name: Test LangChain Arcade
on:
push:
branches:
- main
paths:
- "contrib/langchain/**"
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
paths:
- "contrib/langchain/**"
jobs:
quality:
runs-on: ubuntu-latest
steps:
- name: Check out
uses: actions/checkout@v4
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5
- name: Install
run: cd contrib/langchain && make install && make check
tox:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
fail-fast: false
steps:
- name: Check out
uses: actions/checkout@v4
- name: Set up python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.8.5
- name: Install dependencies
run: cd contrib/langchain && make install
- name: Install tox
run: |
python -m pip install --upgrade pip
python -m pip install tox tox-gh-actions
- name: Test with tox
run: cd contrib/langchain && tox
- name: Upload coverage reports to Codecov with GitHub Action on Python 3.11
uses: codecov/codecov-action@v4.0.1
if: ${{ matrix.python-version == '3.11' }}
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: contrib/langchain/coverage.xml
flags: langchain

View file

@ -11,6 +11,9 @@
</a>
<a href="https://pepy.tech/project/langchain-arcade">
<img src="https://static.pepy.tech/badge/langchain-arcade" alt="Downloads">
<a href="https://pypi.org/project/langchain-arcade/">
<img src="https://img.shields.io/pypi/v/langchain-arcade.svg" alt="PyPI">
</a>
</a>
</div>
@ -24,7 +27,7 @@
## Overview
`langchain-arcade` allows you to use Arcade tools in your LangChain and LangGraph applications.
`langchain-arcade` allows you to use Arcade tools in your LangChain and LangGraph applications. This integration provides a simple way to access Arcade's extensive toolkit ecosystem, including tools for search, email, document processing, and more.
## Installation
@ -32,6 +35,137 @@
pip install langchain-arcade
```
## Usage
## Basic Usage
See the [examples](https://github.com/ArcadeAI/arcade-ai/tree/main/examples/langchain) for usage examples
### 1. Initialize the Tool Manager
The `ToolManager` is the main entry point for working with Arcade tools in LangChain:
```python
import os
from langchain_arcade import ToolManager
# Initialize with your API key
manager = ToolManager(api_key=os.environ["ARCADE_API_KEY"])
# Initialize with specific tools or toolkits
tools = manager.init_tools(
tools=["Web.ScrapeUrl"], # Individual tools
toolkits=["Search"] # All tools from a toolkit
)
# Convert to LangChain tools
langchain_tools = manager.to_langchain()
```
### 2. Use with LangGraph
Here's a simple example of using Arcade tools with LangGraph:
```python
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
# Create a LangGraph agent
model = ChatOpenAI(model="gpt-4o")
memory = MemorySaver()
graph = create_react_agent(model, tools, checkpointer=memory)
config = {"configurable": {"thread_id": "1", "user_id": "user@example.com"}}
user_input = {"messages": [("user", "List my important emails")]}
for chunk in graph.stream(user_input, config, stream_mode="values"):
print(chunk["messages"][-1].content)
```
## Using Tools with Authorization in LangGraph
Many Arcade tools require user authorization. Here's how to handle it:
### 1. Using with prebuilt agents
```python
import os
from langchain_arcade import ToolManager
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# Initialize tools
manager = ToolManager(api_key=os.environ["ARCADE_API_KEY"])
manager.init_tools(toolkits=["Github"])
tools = manager.to_langchain(use_interrupts=True)
# Create agent
model = ChatOpenAI(model="gpt-4o")
graph = create_react_agent(model, tools)
# Run the agent with the "user_id" field in the config
# IMPORTANT the "user_id" field is required for tools that require user authorization
config = {"configurable": {"user_id": "user@lgexample.com"}}
user_input = {"messages": [("user", "Star the arcadeai/arcade-ai repository on GitHub")]}
for chunk in graph.stream(user_input, config, debug=True):
if chunk.get("__interrupt__"):
# print the authorization url
print(chunk["__interrupt__"][0].value)
# visit the URL to authorize the tool
# once you have authorized the tool, you can run again and the agent will continue
elif chunk.get("agent"):
print(chunk["agent"]["messages"][-1].content)
# see the functional example for continuing the agent after authorization
# and for handling authorization errors gracefully
```
See the Functional examples in the [examples directory](https://github.com/ArcadeAI/arcade-ai/tree/main/examples/langchain) that continue the agent after authorization and handle authorization errors gracefully.
### Async Support
For asynchronous applications, use `AsyncToolManager`:
```python
import asyncio
from langchain_arcade import AsyncToolManager
async def main():
manager = AsyncToolManager(api_key=os.environ["ARCADE_API_KEY"])
await manager.init_tools(toolkits=["Google"])
tools = await manager.to_langchain()
# Use tools with async LangChain/LangGraph components
asyncio.run(main())
```
## Tool Authorization Flow
Many Arcade tools require user authorization. This can be handled in many ways but the `ToolManager` provides a simple flow that can be used with prebuilt agents and also the functional API. The typical flow is:
1. Attempt to use a tool that requires authorization
2. Check the state for interrupts from the `NodeInterrupt` exception (or Command)
3. Call `manager.authorize(tool_name, user_id)` to get an authorization URL
4. Present the URL to the user
5. Call `manager.wait_for_auth(auth_response.id)` to wait for completion
6. Resume the agent execution
## Available Toolkits
Arcade provides many toolkits including:
- `Search`: Google search, Bing search
- `Google`: Gmail, Google Drive, Google Calendar
- `Web`: Crawling, scraping, etc
- `Github`: Repository operations
- `Slack`: Sending messages to Slack
- `Linkedin`: Posting to Linkedin
- `X`: Posting and reading tweets on X
- And many more
For a complete list, see the [Arcade Toolkits documentation](https://docs.arcade.dev/toolkits).
## More Examples
For more examples, see the [examples directory](https://github.com/ArcadeAI/arcade-ai/tree/main/examples/langchain).

View file

@ -1,3 +1,7 @@
from .manager import ArcadeToolManager
from .manager import ArcadeToolManager, AsyncToolManager, ToolManager
__all__ = ["ArcadeToolManager"]
__all__ = [
"ToolManager",
"AsyncToolManager",
"ArcadeToolManager", # Deprecated
]

View file

@ -1,7 +1,7 @@
from typing import Any, Callable
from typing import Any, Callable, Union
from arcadepy import NOT_GIVEN, Arcade
from arcadepy.types import ToolDefinition
from arcadepy import NOT_GIVEN, Arcade, AsyncArcade
from arcadepy.types import ExecuteToolResponse, ToolDefinition
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field, create_model
@ -68,6 +68,51 @@ def tool_definition_to_pydantic_model(tool_def: ToolDefinition) -> type[BaseMode
)
def process_tool_execution_response(
execute_response: ExecuteToolResponse, tool_name: str, langgraph: bool
) -> Any:
"""Process the response from tool execution and handle errors appropriately.
Args:
execute_response: The response from tool execution
tool_name: The name of the tool that was executed
langgraph: Whether LangGraph-specific behavior is enabled
Returns:
The output value on success, or error details on failure
"""
if execute_response.success and execute_response.output is not None:
return execute_response.output.value
# Extract detailed error information
error_details = {
"error": "Unknown error occurred",
"tool": tool_name,
}
if execute_response.output is not None and execute_response.output.error is not None:
error = execute_response.output.error
error_message = str(error.message) if hasattr(error, "message") else "Unknown error"
error_details["error"] = error_message
# Add all non-None optional error fields to the details
if (
hasattr(error, "additional_prompt_content")
and error.additional_prompt_content is not None
):
error_details["additional_prompt_content"] = error.additional_prompt_content
if hasattr(error, "can_retry") and error.can_retry is not None:
error_details["can_retry"] = str(error.can_retry)
if hasattr(error, "developer_message") and error.developer_message is not None:
error_details["developer_message"] = str(error.developer_message)
if hasattr(error, "retry_after_ms") and error.retry_after_ms is not None:
error_details["retry_after_ms"] = str(error.retry_after_ms)
if langgraph:
raise NodeInterrupt(error_details)
return error_details
def create_tool_function(
client: Arcade,
tool_name: str,
@ -128,18 +173,13 @@ def create_tool_function(
user_id=user_id if user_id is not None else NOT_GIVEN,
)
if execute_response.success:
return execute_response.output.value # type: ignore[union-attr]
error_message = str(execute_response.output.error) # type: ignore[union-attr]
if langgraph:
raise NodeInterrupt(error_message)
return {"error": error_message}
return process_tool_execution_response(execute_response, tool_name, langgraph)
return tool_function
def wrap_arcade_tool(
client: Arcade,
client: Union[Arcade, AsyncArcade],
tool_name: str,
tool_def: ToolDefinition,
langgraph: bool = False,
@ -161,13 +201,23 @@ def wrap_arcade_tool(
args_schema = tool_definition_to_pydantic_model(tool_def)
# Create the action function
action_func = create_tool_function(
client=client,
tool_name=tool_name,
tool_def=tool_def,
args_schema=args_schema,
langgraph=langgraph,
)
if isinstance(client, Arcade):
action_func = create_tool_function(
client=client,
tool_name=tool_name,
tool_def=tool_def,
args_schema=args_schema,
langgraph=langgraph,
)
else:
# Use async tool function for AsyncArcade client
action_func = create_async_tool_function(
client=client,
tool_name=tool_name,
tool_def=tool_def,
args_schema=args_schema,
langgraph=langgraph,
)
# Create the StructuredTool instance
return StructuredTool.from_function(
@ -177,3 +227,68 @@ def wrap_arcade_tool(
args_schema=args_schema,
inject_kwargs={"user_id"},
)
def create_async_tool_function(
client: AsyncArcade,
tool_name: str,
tool_def: ToolDefinition,
args_schema: type[BaseModel],
langgraph: bool = False,
) -> Callable:
"""Create an async callable function to execute an Arcade tool.
Args:
client: The AsyncArcade client instance.
tool_name: The name of the tool to wrap.
tool_def: The ToolDefinition of the tool to wrap.
args_schema: The Pydantic model representing the tool's arguments.
langgraph: Whether to enable LangGraph-specific behavior.
Returns:
An async callable function that executes the tool.
"""
if langgraph and not LANGGRAPH_ENABLED:
raise ImportError("LangGraph is not installed. Please install it to use this feature.")
requires_authorization = (
tool_def.requirements is not None and tool_def.requirements.authorization is not None
)
async def tool_function(config: RunnableConfig, **kwargs: Any) -> Any:
"""Run the Arcade tool with the given parameters.
Args:
config: RunnableConfig containing execution context.
**kwargs: Tool input arguments.
Returns:
The output from the tool execution.
"""
user_id = config.get("configurable", {}).get("user_id") if config else None
if requires_authorization:
if user_id is None:
error_message = f"user_id is required to run {tool_name}"
if langgraph:
raise NodeInterrupt(error_message)
return {"error": error_message}
# Authorize the user for the tool
auth_response = await client.tools.authorize(tool_name=tool_name, user_id=user_id)
if auth_response.status != "completed":
auth_message = f"Please use the following link to authorize: {auth_response.url}"
if langgraph:
raise NodeInterrupt(auth_message)
return {"error": auth_message}
# Execute the tool with provided inputs
execute_response = await client.tools.execute(
tool_name=tool_name,
input=kwargs,
user_id=user_id if user_id is not None else NOT_GIVEN,
)
return process_tool_execution_response(execute_response, tool_name, langgraph)
return tool_function

View file

@ -1,91 +1,414 @@
import os
import warnings
from collections.abc import Iterator
from typing import Any, Optional
from typing import Any, Optional, Union
from arcadepy import Arcade
from arcadepy import NOT_GIVEN, Arcade, AsyncArcade
from arcadepy.types import ToolDefinition
from arcadepy.types.shared import AuthorizationResponse
from langchain_core.tools import StructuredTool
from langchain_arcade._utilities import (
wrap_arcade_tool,
)
from langchain_arcade._utilities import wrap_arcade_tool
ClientType = Union[Arcade, AsyncArcade]
class ArcadeToolManager:
class LangChainToolManager:
"""
Arcade tool manager for LangChain framework.
Base tool manager for LangChain framework.
Provides a common interface for both synchronous and asynchronous tool managers.
This class wraps Arcade tools as LangChain `StructuredTool`
objects for integration.
This class handles the storage and retrieval of tool definitions and provides
common functionality used by both synchronous and asynchronous implementations.
"""
def __init__(
self,
client: Optional[Arcade] = None,
**kwargs: dict[str, Any],
) -> None:
"""Initialize the ArcadeToolManager.
Example:
>>> manager = ArcadeToolManager(api_key="...")
>>>
>>> # retrieve a specific tool as a langchain tool
>>> manager.get_tools(tools=["Search.SearchGoogle"])
>>>
>>> # retrieve all tools in a toolkit as langchain tools
>>> manager.get_tools(toolkits=["Search"])
>>>
>>> # clear and initialize new tools in the manager
>>> manager.init_tools(tools=["Search.SearchGoogle"], toolkits=["Search"])
Args:
client: Optional Arcade client instance.
**kwargs: Additional keyword arguments to pass to the Arcade client.
"""
if not client:
api_key = kwargs.get("api_key", os.getenv("ARCADE_API_KEY"))
base_url = kwargs.get("base_url", os.getenv("ARCADE_BASE_URL"))
arcade_kwargs = {"api_key": api_key, **kwargs}
if base_url:
arcade_kwargs["base_url"] = base_url
client = Arcade(**arcade_kwargs) # type: ignore[arg-type]
self.client = client
def __init__(self) -> None:
self._tools: dict[str, ToolDefinition] = {}
@property
def tools(self) -> list[str]:
"""
Get the list of tools by name in the manager.
Returns:
A list of tool names (strings) currently stored in the manager.
"""
return list(self._tools.keys())
def __iter__(self) -> Iterator[tuple[str, ToolDefinition]]:
yield from self._tools.items()
def __len__(self) -> int:
"""Return the number of tools in the manager."""
return len(self._tools)
def _get_client_config(self, **kwargs: Any) -> dict[str, Any]:
"""
Get the client configurations from environment variables and kwargs.
If api_key or base_url are in the kwargs, they will be used.
Otherwise, the environment variables ARCADE_API_KEY and ARCADE_BASE_URL will be used.
If both are provided, the kwargs will take precedence.
Args:
**kwargs: Keyword arguments that may contain api_key and base_url.
Returns:
A dictionary of client configuration parameters.
"""
client_kwargs = {
"api_key": kwargs.get("api_key", os.getenv("ARCADE_API_KEY")),
}
base_url = kwargs.get("base_url", os.getenv("ARCADE_BASE_URL"))
if base_url:
client_kwargs["base_url"] = base_url
return client_kwargs
def _get_tool_definition(self, tool_name: str) -> ToolDefinition:
"""
Get a tool definition by name, raising an error if not found.
Args:
tool_name: The name of the tool to retrieve.
Returns:
The ToolDefinition for the specified tool.
Raises:
ValueError: If the tool is not found in the manager.
"""
try:
return self._tools[tool_name]
except KeyError:
raise ValueError(f"Tool '{tool_name}' not found in this manager instance")
def __getitem__(self, tool_name: str) -> ToolDefinition:
return self._tools[tool_name]
"""
Get a tool definition by name using dictionary-like access.
Args:
tool_name: The name of the tool to retrieve.
Returns:
The ToolDefinition for the specified tool.
Raises:
ValueError: If the tool is not found in the manager.
"""
return self._get_tool_definition(tool_name)
def requires_auth(self, tool_name: str) -> bool:
"""
Check if a tool requires authorization.
Args:
tool_name: The name of the tool to check.
Returns:
True if the tool requires authorization, False otherwise.
"""
tool_def = self._get_tool_definition(tool_name)
if tool_def.requirements is None:
return False
return tool_def.requirements.authorization is not None
class ToolManager(LangChainToolManager):
"""
Synchronous Arcade tool manager for LangChain framework.
This class wraps Arcade tools as LangChain StructuredTool objects for integration
with synchronous operations.
Example:
>>> manager = ToolManager(api_key="your-api-key")
>>> # Initialize with specific tools and toolkits
>>> manager.init_tools(tools=["Search.SearchGoogle"], toolkits=["Weather"])
>>> # Get tools as LangChain StructuredTools
>>> langchain_tools = manager.to_langchain()
>>> # Handle authorization for tools that require it
>>> if manager.requires_auth("Search.SearchGoogle"):
>>> auth_response = manager.authorize("Search.SearchGoogle", "user_123")
>>> manager.wait_for_auth(auth_response.id)
"""
def __init__(self, client: Optional[Arcade] = None, **kwargs: Any) -> None:
"""
Initialize the ToolManager.
Example:
>>> manager = ToolManager(api_key="your-api-key")
>>> # or with an existing client
>>> client = Arcade(api_key="your-api-key")
>>> manager = ToolManager(client=client)
Args:
client: Optional Arcade client instance. If not provided, one will be created.
**kwargs: Additional keyword arguments to pass to the Arcade client if creating one.
Common options include api_key and base_url.
"""
super().__init__()
if client is None:
client_kwargs = self._get_client_config(**kwargs)
client = Arcade(**client_kwargs)
self._client = client
@property
def definitions(self) -> list[ToolDefinition]:
"""
Get the list of tool definitions in the manager.
Returns:
A list of ToolDefinition objects currently stored in the manager.
"""
return list(self._tools.values())
def __iter__(self) -> Iterator[tuple[str, ToolDefinition]]:
"""
Iterate over the tools in the manager as (name, definition) pairs.
Returns:
Iterator over (tool_name, tool_definition) tuples.
"""
yield from self._tools.items()
def to_langchain(
self, use_interrupts: bool = True, use_underscores: bool = True
) -> list[StructuredTool]:
"""
Get the tools in the manager as LangChain StructuredTool objects.
Args:
use_interrupts: Whether to use interrupts for the tool. This is useful
for LangGraph workflows where you need to handle tool
authorization through state transitions.
use_underscores: Whether to use underscores for the tool name instead of periods.
For example, "Search_SearchGoogle" vs "Search.SearchGoogle".
Some model providers like OpenAI work better with underscores.
Returns:
List of StructuredTool instances ready to use with LangChain.
"""
tool_map = _create_tool_map(self.definitions, use_underscores=use_underscores)
return [
wrap_arcade_tool(self._client, tool_name, definition, langgraph=use_interrupts)
for tool_name, definition in tool_map.items()
]
def init_tools(
self,
tools: Optional[list[str]] = None,
toolkits: Optional[list[str]] = None,
) -> None:
"""Initialize the tools in the manager.
limit: Optional[int] = None,
offset: Optional[int] = None,
raise_on_empty: bool = True,
) -> list[StructuredTool]:
"""
Initialize the tools in the manager and return them as LangChain tools.
This will clear any existing tools in the manager.
This will clear any existing tools in the manager and replace them with
the new tools specified by the tools and toolkits parameters.
Note: In version 2.0+, this method returns a list of StructuredTool objects.
In earlier versions, it returned None.
Example:
>>> manager = ArcadeToolManager(api_key="...")
>>> manager.init_tools(tools=["Search.SearchGoogle"])
>>> manager.get_tools()
>>> manager = ToolManager(api_key="your-api-key")
>>> langchain_tools = manager.init_tools(tools=["Search.SearchGoogle"])
>>> # Use these tools with a LangChain chain or agent
>>> agent = Agent(tools=langchain_tools, llm=llm)
Args:
tools: Optional list of tool names to include.
toolkits: Optional list of toolkits to include.
tools: Optional list of specific tool names to include (e.g., "Search.SearchGoogle").
toolkits: Optional list of toolkit names to include all tools from (e.g., "Search").
limit: Optional limit on the number of tools to retrieve per request.
offset: Optional offset for paginated requests.
raise_on_empty: Whether to raise an error if no tools or toolkits are provided.
Returns:
List of StructuredTool instances ready to use with LangChain.
Raises:
ValueError: If no tools or toolkits are provided and raise_on_empty is True.
"""
self._tools = self._retrieve_tool_definitions(tools, toolkits)
tools_list = self._retrieve_tool_definitions(tools, toolkits, raise_on_empty, limit, offset)
self._tools = _create_tool_map(tools_list)
return self.to_langchain()
def authorize(self, tool_name: str, user_id: str) -> AuthorizationResponse:
"""
Authorize a user for a specific tool.
Example:
>>> manager = ToolManager(api_key="your-api-key")
>>> manager.init_tools(tools=["Gmail.SendEmail"])
>>> auth_response = manager.authorize("Gmail.SendEmail", "user_123")
>>> # auth_response.auth_url contains the URL for the user to authorize
Args:
tool_name: The name of the tool to authorize.
user_id: The user ID to authorize. This should be a unique identifier for the user.
Returns:
AuthorizationResponse containing authorization details, including the auth_url
that should be presented to the user to complete authorization.
"""
return self._client.tools.authorize(tool_name=tool_name, user_id=user_id)
def is_authorized(self, authorization_id: str) -> bool:
"""
Check if a tool authorization is complete.
Example:
>>> manager = ToolManager(api_key="your-api-key")
>>> auth_response = manager.authorize("Gmail.SendEmail", "user_123")
>>> # After user completes authorization
>>> is_complete = manager.is_authorized(auth_response.id)
Args:
authorization_id: The authorization ID to check. This can be the full AuthorizationResponse
object or just the ID string.
Returns:
True if the authorization is completed, False otherwise.
"""
# Handle case where entire AuthorizationResponse object is passed
if hasattr(authorization_id, "id"):
authorization_id = authorization_id.id
response = self._client.auth.status(id=authorization_id)
if response:
return response.status == "completed"
return False
def wait_for_auth(self, authorization_id: str) -> AuthorizationResponse:
"""
Wait for a tool authorization to complete. This method blocks until
the authorization is complete or fails.
Example:
>>> manager = ToolManager(api_key="your-api-key")
>>> auth_response = manager.authorize("Gmail.SendEmail", "user_123")
>>> # Share auth_response.auth_url with the user
>>> # Wait for the user to complete authorization
>>> completed_auth = manager.wait_for_auth(auth_response.id)
Args:
authorization_id: The authorization ID to wait for. This can be the full
AuthorizationResponse object or just the ID string.
Returns:
AuthorizationResponse with the completed authorization details.
"""
# Handle case where entire AuthorizationResponse object is passed
if hasattr(authorization_id, "id"):
authorization_id = authorization_id.id
return self._client.auth.wait_for_completion(authorization_id)
def _retrieve_tool_definitions(
self,
tools: Optional[list[str]] = None,
toolkits: Optional[list[str]] = None,
raise_on_empty: bool = True,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> list[ToolDefinition]:
"""
Retrieve tool definitions from the Arcade client, accounting for pagination.
Args:
tools: Optional list of specific tool names to include.
toolkits: Optional list of toolkit names to include all tools from.
raise_on_empty: Whether to raise an error if no tools or toolkits are provided.
limit: Optional limit on the number of tools to retrieve per request.
offset: Optional offset for paginated requests.
Returns:
List of ToolDefinition instances.
Raises:
ValueError: If no tools or toolkits are provided and raise_on_empty is True.
"""
all_tools: list[ToolDefinition] = []
# If no specific tools or toolkits are requested, raise an error.
if not tools and not toolkits:
if raise_on_empty:
raise ValueError("No tools or toolkits provided to retrieve tool definitions.")
return []
# Retrieve individual tools if specified
if tools:
for tool_id in tools:
single_tool = self._client.tools.get(name=tool_id)
all_tools.append(single_tool)
# Retrieve tools from specified toolkits
if toolkits:
for tk in toolkits:
# Convert None to NOT_GIVEN for Stainless client
paginated_tools = self._client.tools.list(
toolkit=tk,
limit=limit if limit is not None else NOT_GIVEN,
offset=offset if offset is not None else NOT_GIVEN,
)
all_tools.extend(paginated_tools)
return all_tools
def add_tool(self, tool_name: str) -> None:
"""
Add a single tool to the manager by name.
Unlike init_tools(), this method preserves existing tools in the manager
and only adds the specified tool.
Example:
>>> manager = ToolManager(api_key="your-api-key")
>>> manager.add_tool("Gmail.SendEmail")
>>> manager.add_tool("Search.SearchGoogle")
>>> # Get all tools including newly added ones
>>> all_tools = manager.to_langchain()
Args:
tool_name: The fully qualified name of the tool to add (e.g., "Search.SearchGoogle")
Raises:
ValueError: If the tool cannot be found
"""
tool = self._client.tools.get(name=tool_name)
self._tools.update(_create_tool_map([tool]))
def add_toolkit(
self, toolkit_name: str, limit: Optional[int] = None, offset: Optional[int] = None
) -> None:
"""
Add all tools from a specific toolkit to the manager.
Unlike init_tools(), this method preserves existing tools in the manager
and only adds the tools from the specified toolkit.
Example:
>>> manager = ToolManager(api_key="your-api-key")
>>> manager.add_toolkit("Gmail")
>>> manager.add_toolkit("Search")
>>> # Get all tools including newly added ones
>>> all_tools = manager.to_langchain()
Args:
toolkit_name: The name of the toolkit to add (e.g., "Search")
limit: Optional limit on the number of tools to retrieve per request
offset: Optional offset for paginated requests
Raises:
ValueError: If the toolkit cannot be found or has no tools
"""
tools = self._client.tools.list(
toolkit=toolkit_name,
limit=NOT_GIVEN if limit is None else limit,
offset=NOT_GIVEN if offset is None else offset,
)
for tool in tools:
self._tools.update(_create_tool_map([tool]))
def get_tools(
self,
@ -93,20 +416,11 @@ class ArcadeToolManager:
toolkits: Optional[list[str]] = None,
langgraph: bool = True,
) -> list[StructuredTool]:
"""Return the tools in the manager as LangChain StructuredTool objects.
"""
DEPRECATED: Return the tools in the manager as LangChain StructuredTool objects.
Note: if tools/toolkits are provided, the manager will update it's
internal tools using a dictionary update by tool name.
If langgraph is True, the tools will be wrapped with LangGraph-specific
behavior such as NodeInterrupts for auth.
Note: Changed in 1.0.0 to default to True.
Example:
>>> manager = ArcadeToolManager(api_key="...")
>>>
>>> # retrieve a specific tool as a langchain tool
>>> manager.get_tools(tools=["Search.SearchGoogle"])
This method is deprecated and will be removed in a future major version.
Please use `init_tools()` to initialize tools and `to_langchain()` to convert them.
Args:
tools: Optional list of tool names to include.
@ -117,103 +431,402 @@ class ArcadeToolManager:
Returns:
List of StructuredTool instances.
"""
# TODO account for versioning
warnings.warn(
"get_tools() is deprecated and will be removed in the next major version. "
"Please use init_tools() to initialize tools and to_langchain() to convert them.",
DeprecationWarning,
stacklevel=2,
)
# Support existing usage pattern
if tools or toolkits:
new_tools = self._retrieve_tool_definitions(tools, toolkits)
self._tools.update(new_tools)
elif len(self) == 0:
self.init_tools()
self.init_tools(tools=tools, toolkits=toolkits)
langchain_tools: list[StructuredTool] = []
for tool_name, definition in self:
lc_tool = wrap_arcade_tool(self.client, tool_name, definition, langgraph)
langchain_tools.append(lc_tool)
return langchain_tools
return self.to_langchain(use_interrupts=langgraph)
def authorize(self, tool_name: str, user_id: str) -> AuthorizationResponse:
"""Authorize a user for a tool.
class ArcadeToolManager(ToolManager):
"""
Deprecated alias for ToolManager.
ArcadeToolManager is deprecated and will be removed in the next major version.
Please use ToolManager instead.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn(
"ArcadeToolManager is deprecated and will be removed in the next major version. "
"Please use ToolManager instead.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
class AsyncToolManager(LangChainToolManager):
"""
Async version of Arcade tool manager for LangChain framework.
This class wraps Arcade tools as LangChain StructuredTool objects for integration
with asynchronous operations.
Example:
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> # Initialize with specific tools and toolkits
>>> await manager.init_tools(tools=["Search.SearchGoogle"], toolkits=["Weather"])
>>> # Get tools as LangChain StructuredTools
>>> langchain_tools = await manager.to_langchain()
>>> # Handle authorization for tools that require it
>>> if manager.requires_auth("Search.SearchGoogle"):
>>> auth_response = await manager.authorize("Search.SearchGoogle", "user_123")
>>> await manager.wait_for_auth(auth_response.id)
"""
def __init__(
self,
client: Optional[AsyncArcade] = None,
**kwargs: Any,
) -> None:
"""
Initialize the AsyncToolManager.
Example:
>>> manager = ArcadeToolManager(api_key="...")
>>> manager.authorize("X.PostTweet", "user_123")
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> # or with an existing client
>>> client = AsyncArcade(api_key="your-api-key")
>>> manager = AsyncToolManager(client=client)
Args:
client: Optional AsyncArcade client instance. If not provided, one will be created.
**kwargs: Additional keyword arguments to pass to the AsyncArcade client if creating one.
Common options include api_key and base_url.
"""
super().__init__()
if not client:
client_kwargs = self._get_client_config(**kwargs)
client = AsyncArcade(**client_kwargs)
self._client = client
@property
def definitions(self) -> list[ToolDefinition]:
"""
Get the list of tool definitions in the manager.
Returns:
A list of ToolDefinition objects currently stored in the manager.
"""
return list(self._tools.values())
def __iter__(self) -> Iterator[tuple[str, ToolDefinition]]:
"""
Iterate over the tools in the manager as (name, definition) pairs.
Returns:
Iterator over (tool_name, tool_definition) tuples.
"""
yield from self._tools.items()
async def init_tools(
self,
tools: Optional[list[str]] = None,
toolkits: Optional[list[str]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
raise_on_empty: bool = True,
) -> list[StructuredTool]:
"""
Initialize the tools in the manager asynchronously and return them as LangChain tools.
This will clear any existing tools in the manager and replace them with
the new tools specified by the tools and toolkits parameters.
Example:
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> langchain_tools = await manager.init_tools(tools=["Search.SearchGoogle"])
>>> # Use these tools with a LangChain chain or agent
>>> agent = Agent(tools=langchain_tools, llm=llm)
Args:
tools: Optional list of specific tool names to include (e.g., "Search.SearchGoogle").
toolkits: Optional list of toolkit names to include all tools from (e.g., "Search").
limit: Optional limit on the number of tools to retrieve per request.
offset: Optional offset for paginated requests.
raise_on_empty: Whether to raise an error if no tools or toolkits are provided.
Returns:
List of StructuredTool instances ready to use with LangChain.
Raises:
ValueError: If no tools or toolkits are provided and raise_on_empty is True.
"""
tools_list = await self._retrieve_tool_definitions(
tools, toolkits, raise_on_empty, limit, offset
)
self._tools.update(_create_tool_map(tools_list))
return await self.to_langchain()
async def to_langchain(
self, use_interrupts: bool = True, use_underscores: bool = True
) -> list[StructuredTool]:
"""
Get the tools in the manager as LangChain StructuredTool objects asynchronously.
Args:
use_interrupts: Whether to use interrupts for the tool. This is useful
for LangGraph workflows where you need to handle tool
authorization through state transitions.
use_underscores: Whether to use underscores for the tool name instead of periods.
For example, "Search_SearchGoogle" vs "Search.SearchGoogle".
Some model providers like OpenAI work better with underscores.
Returns:
List of StructuredTool instances ready to use with LangChain.
"""
tool_map = _create_tool_map(self.definitions, use_underscores=use_underscores)
return [
wrap_arcade_tool(self._client, tool_name, definition, langgraph=use_interrupts)
for tool_name, definition in tool_map.items()
]
async def authorize(self, tool_name: str, user_id: str) -> AuthorizationResponse:
"""
Authorize a user for a tool.
Example:
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> await manager.init_tools(tools=["Gmail.SendEmail"])
>>> auth_response = await manager.authorize("Gmail.SendEmail", "user_123")
>>> # auth_response.auth_url contains the URL for the user to authorize
Args:
tool_name: The name of the tool to authorize.
user_id: The user ID to authorize.
user_id: The user ID to authorize. This should be a unique identifier for the user.
Returns:
AuthorizationResponse
AuthorizationResponse containing authorization details, including the auth_url
that should be presented to the user to complete authorization.
"""
return self.client.tools.authorize(tool_name=tool_name, user_id=user_id)
return await self._client.tools.authorize(tool_name=tool_name, user_id=user_id)
def is_authorized(self, authorization_id: str) -> bool:
"""Check if a tool authorization is complete.
async def is_authorized(self, authorization_id: str) -> bool:
"""
Check if a tool authorization is complete.
Example:
>>> manager = ArcadeToolManager(api_key="...")
>>> manager.init_tools(toolkits=["Search"])
>>> manager.is_authorized("auth_123")
"""
return self.client.auth.status(id=authorization_id).status == "completed"
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> auth_response = await manager.authorize("Gmail.SendEmail", "user_123")
>>> # After user completes authorization
>>> is_complete = await manager.is_authorized(auth_response.id)
def wait_for_auth(self, authorization_id: str) -> AuthorizationResponse:
"""Wait for a tool authorization to complete.
Args:
authorization_id: The authorization ID to check. This can be the full AuthorizationResponse
object or just the ID string.
Returns:
True if the authorization is completed, False otherwise.
"""
# Handle case where entire AuthorizationResponse object is passed
if hasattr(authorization_id, "id"):
authorization_id = authorization_id.id
auth_status = await self._client.auth.status(id=authorization_id)
return auth_status.status == "completed"
async def wait_for_auth(self, authorization_id: str) -> AuthorizationResponse:
"""
Wait for a tool authorization to complete. This method blocks until
the authorization is complete or fails.
Example:
>>> manager = ArcadeToolManager(api_key="...")
>>> manager.init_tools(toolkits=["Google.ListEmails"])
>>> response = manager.authorize("Google.ListEmails", "user_123")
>>> manager.wait_for_auth(response)
>>> # or
>>> manager.wait_for_auth(response.id)
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> auth_response = await manager.authorize("Gmail.SendEmail", "user_123")
>>> # Share auth_response.auth_url with the user
>>> # Wait for the user to complete authorization
>>> completed_auth = await manager.wait_for_auth(auth_response.id)
Args:
authorization_id: The authorization ID to wait for. This can be the full
AuthorizationResponse object or just the ID string.
Returns:
AuthorizationResponse with the completed authorization details.
"""
return self.client.auth.wait_for_completion(authorization_id)
# Handle case where entire AuthorizationResponse object is passed
if hasattr(authorization_id, "id"):
authorization_id = authorization_id.id
def requires_auth(self, tool_name: str) -> bool:
"""Check if a tool requires authorization."""
return await self._client.auth.wait_for_completion(authorization_id)
tool_def = self._get_tool_definition(tool_name)
if tool_def.requirements is None:
return False
return tool_def.requirements.authorization is not None
async def _retrieve_tool_definitions(
self,
tools: Optional[list[str]] = None,
toolkits: Optional[list[str]] = None,
raise_on_empty: bool = True,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> list[ToolDefinition]:
"""
Retrieve tool definitions asynchronously from the Arcade client, accounting for pagination.
def _get_tool_definition(self, tool_name: str) -> ToolDefinition:
try:
return self._tools[tool_name]
except KeyError:
raise ValueError(f"Tool '{tool_name}' not found in this ArcadeToolManager instance")
Args:
tools: Optional list of specific tool names to include.
toolkits: Optional list of toolkit names to include all tools from.
raise_on_empty: Whether to raise an error if no tools or toolkits are provided.
limit: Optional limit on the number of tools to retrieve per request.
offset: Optional offset for paginated requests.
def _retrieve_tool_definitions(
self, tools: Optional[list[str]] = None, toolkits: Optional[list[str]] = None
) -> dict[str, ToolDefinition]:
"""Retrieve tool definitions from the Arcade client, accounting for pagination."""
Returns:
List of ToolDefinition instances.
Raises:
ValueError: If no tools or toolkits are provided and raise_on_empty is True.
"""
all_tools: list[ToolDefinition] = []
# If no specific tools or toolkits are requested, raise an error.
if not tools and not toolkits:
if raise_on_empty:
raise ValueError("No tools or toolkits provided to retrieve tool definitions.")
return []
# First, gather single tools if the user specifically requested them.
if tools:
for tool_id in tools:
# ToolsResource.get(...) returns a single ToolDefinition.
single_tool = self.client.tools.get(name=tool_id)
single_tool = await self._client.tools.get(name=tool_id)
all_tools.append(single_tool)
# Next, gather tool definitions from any requested toolkits.
if toolkits:
for tk in toolkits:
# tools.list(...) returns a paginated response (SyncOffsetPage),
# which has an __iter__ method that automatically iterates over all pages.
paginated_tools = self.client.tools.list(toolkit=tk)
all_tools.extend(paginated_tools)
# Convert None to NOT_GIVEN for Stainless client
paginated_tools = await self._client.tools.list(
toolkit=tk,
limit=NOT_GIVEN if limit is None else limit,
offset=NOT_GIVEN if offset is None else offset,
)
async for tool in paginated_tools:
all_tools.append(tool)
# If no specific tools or toolkits were requested, retrieve *all* tools.
if not tools and not toolkits:
paginated_all_tools = self.client.tools.list()
all_tools.extend(paginated_all_tools)
# Build a dictionary that maps the "full_tool_name" to the tool definition.
tool_definitions: dict[str, ToolDefinition] = {}
for tool in all_tools:
# For items returned by .list(), the 'toolkit' and 'name' attributes
# should be present as plain fields on the object. (No need to do toolkit.name)
full_tool_name = f"{tool.toolkit.name}_{tool.name}"
tool_definitions[full_tool_name] = tool
return all_tools
return tool_definitions
async def add_tool(self, tool_name: str) -> None:
"""
Add a single tool to the manager by name.
Unlike init_tools(), this method preserves existing tools in the manager
and only adds the specified tool.
Example:
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> await manager.add_tool("Gmail.SendEmail")
>>> await manager.add_tool("Search.SearchGoogle")
>>> # Get all tools including newly added ones
>>> all_tools = await manager.to_langchain()
Args:
tool_name: The fully qualified name of the tool to add (e.g., "Search.SearchGoogle")
Raises:
ValueError: If the tool cannot be found
"""
tool = await self._client.tools.get(name=tool_name)
self._tools.update(_create_tool_map([tool]))
async def add_toolkit(
self, toolkit_name: str, limit: Optional[int] = None, offset: Optional[int] = None
) -> None:
"""
Add all tools from a specific toolkit to the manager.
Unlike init_tools(), this method preserves existing tools in the manager
and only adds the tools from the specified toolkit.
Example:
>>> manager = AsyncToolManager(api_key="your-api-key")
>>> await manager.add_toolkit("Gmail")
>>> await manager.add_toolkit("Search")
>>> # Get all tools including newly added ones
>>> all_tools = await manager.to_langchain()
Args:
toolkit_name: The name of the toolkit to add (e.g., "Search")
limit: Optional limit on the number of tools to retrieve per request
offset: Optional offset for paginated requests
Raises:
ValueError: If the toolkit cannot be found or has no tools
"""
paginated_tools = await self._client.tools.list(
toolkit=toolkit_name,
limit=NOT_GIVEN if limit is None else limit,
offset=NOT_GIVEN if offset is None else offset,
)
async for tool in paginated_tools:
self._tools.update(_create_tool_map([tool]))
async def get_tools(
self,
tools: Optional[list[str]] = None,
toolkits: Optional[list[str]] = None,
langgraph: bool = True,
) -> list[StructuredTool]:
"""
DEPRECATED: Return the tools in the manager as LangChain StructuredTool objects.
This method is deprecated and will be removed in a future major version.
Please use `init_tools()` to initialize tools and `to_langchain()` to convert them.
Args:
tools: Optional list of tool names to include.
toolkits: Optional list of toolkits to include.
langgraph: Whether to use LangGraph-specific behavior
such as NodeInterrupts for auth.
Returns:
List of StructuredTool instances.
"""
warnings.warn(
"get_tools() is deprecated and will be removed in the next major version. "
"Please use init_tools() to initialize tools and to_langchain() to convert them.",
DeprecationWarning,
stacklevel=2,
)
# Support existing usage pattern
if tools or toolkits:
return await self.init_tools(tools=tools, toolkits=toolkits)
return []
def _create_tool_map(
tools: list[ToolDefinition],
use_underscores: bool = True,
) -> dict[str, ToolDefinition]:
"""
Build a dictionary that maps the "full_tool_name" to the tool definition.
Args:
tools: List of ToolDefinition objects to map.
use_underscores: Whether to use underscores instead of periods in tool names.
For example, "Search_SearchGoogle" vs "Search.SearchGoogle".
Returns:
Dictionary mapping tool names to tool definitions.
Note:
This is a temporary solution to support the naming convention of certain model providers
like OpenAI, which work better with underscores in tool names.
"""
tool_map: dict[str, ToolDefinition] = {}
for tool in tools:
# Ensure toolkit name and tool name are not None before creating the key
toolkit_name = tool.toolkit.name if tool.toolkit and tool.toolkit.name else None
if toolkit_name and tool.name:
if use_underscores:
tool_name = f"{toolkit_name}_{tool.name}"
else:
tool_name = f"{toolkit_name}.{tool.name}"
tool_map[tool_name] = tool
return tool_map

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "langchain-arcade"
version = "1.1.0"
version = "1.2.0"
description = "An integration package connecting Arcade and Langchain/LangGraph"
authors = ["Arcade <dev@arcade.dev>"]
readme = "README.md"
@ -8,7 +8,7 @@ repository = "https://github.com/arcadeai/arcade-ai/tree/main/contrib/langchain"
license = "MIT"
[tool.poetry.dependencies]
python = ">=3.10,<3.13"
python = ">=3.10,<4"
arcadepy = "1.1.*"
langgraph = ">=0.2.67,<0.3.0"

View file

@ -1,22 +1,24 @@
from unittest.mock import MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from arcadepy.pagination import SyncOffsetPage
from arcadepy import NOT_GIVEN
from arcadepy.pagination import AsyncOffsetPage, SyncOffsetPage
from arcadepy.types import ToolDefinition
from arcadepy.types.shared import AuthorizationResponse
from langchain_arcade.manager import ArcadeToolManager
from langchain_arcade.manager import ArcadeToolManager, AsyncToolManager, ToolManager
@pytest.fixture
def mock_arcade_client():
"""
A fixture to mock the Arcade client object for testing the ArcadeToolManager.
A fixture to mock the Arcade client object for testing the ToolManager.
This mocks all relevant methods used by the manager, including:
- tools.get
- tools.list
- tools.authorize
- auth.status
- auth.wait_for_completion
"""
mock_client = MagicMock()
# Mock the "tools" sub-client
@ -25,16 +27,54 @@ def mock_arcade_client():
mock_client.tools.authorize = MagicMock()
# Mock the "auth" sub-client
mock_client.auth.status = MagicMock()
mock_client.auth.wait_for_completion = MagicMock()
return mock_client
@pytest.fixture
def async_mock_arcade_client():
"""
A fixture to mock the Arcade client object for testing the AsyncToolManager.
"""
mock_client = AsyncMock()
mock_client.tools.get = AsyncMock()
mock_client.tools.list = AsyncMock()
mock_client.tools.authorize = AsyncMock()
mock_client.auth.status = AsyncMock()
mock_client.auth.wait_for_completion = AsyncMock()
return mock_client
@pytest.fixture
def manager(mock_arcade_client):
"""
A fixture that creates an ArcadeToolManager with the mocked Arcade client.
A fixture that creates a ToolManager with the mocked Arcade client.
"""
return ArcadeToolManager(client=mock_arcade_client)
return ToolManager(client=mock_arcade_client)
@pytest.fixture
def async_manager(async_mock_arcade_client):
"""
A fixture that creates an AsyncToolManager with the mocked Arcade client.
"""
return AsyncToolManager(client=async_mock_arcade_client)
@pytest.fixture(params=[("sync", False), ("async", True)])
def manager_fixture(request, manager, async_manager):
"""
A parameterized fixture that returns a tuple with:
- The appropriate manager (sync or async)
- A boolean indicating if it's async
- The appropriate mock client
"""
param_name, is_async = request.param
if is_async:
return async_manager, True
else:
return manager, False
@pytest.fixture
@ -98,101 +138,327 @@ def make_tool():
return _make_tool
def test_init_tools(manager, mock_arcade_client, make_tool):
async def maybe_await(obj, is_async):
"""Helper to handle both sync and async return values"""
if is_async:
return await obj
return obj
@pytest.mark.asyncio
async def test_init_tools_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
Test that init_tools clears any existing tools and retrieves new ones
from either an explicit list of tools or an entire toolkit.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
mock_tool = make_tool("Search_SearchGoogle")
mock_arcade_client.tools.get.return_value = mock_tool
mock_arcade_client.tools.list.return_value = SyncOffsetPage(items=[mock_tool])
client.tools.get.return_value = mock_tool
page_cls = AsyncOffsetPage if is_async else SyncOffsetPage
client.tools.list.return_value = page_cls(items=[mock_tool])
# Act
manager.init_tools(tools=["Search_SearchGoogle"])
result = await maybe_await(manager.init_tools(tools=["Search_SearchGoogle"]), is_async)
# Assert
assert "Search_SearchGoogle" in manager.tools
assert manager._tools["Search_SearchGoogle"] == mock_tool
mock_arcade_client.tools.get.assert_called_once_with(name="Search_SearchGoogle")
client.tools.get.assert_called_once_with(name="Search_SearchGoogle")
# Verify the result is a list of StructuredTool objects
assert len(result) == 1
def test_get_tools_no_init(manager, mock_arcade_client, make_tool):
@pytest.mark.asyncio
async def test_to_langchain_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
If get_tools is called without init_tools and no tools are specified,
it should call init_tools internally and fetch all available tools.
Test that to_langchain returns the tools as StructuredTool objects.
"""
# Arrange
mock_tool = make_tool("Search_SearchGoogle")
mock_arcade_client.tools.list.return_value = SyncOffsetPage(items=[mock_tool])
manager, is_async = manager_fixture
# Act
tools = manager.get_tools() # no param means manager auto-inits
mock_tool = make_tool("Search_SearchGoogle")
manager._tools = {"Search_SearchGoogle": mock_tool}
# Act - with default parameters
result = await maybe_await(manager.to_langchain(), is_async)
# Assert
assert len(tools) == 1
assert "Search_SearchGoogle" in manager.tools
assert manager._tools["Search_SearchGoogle"] == mock_tool
mock_arcade_client.tools.list.assert_called_once()
assert len(result) == 1
assert result[0].name == "Search_SearchGoogle"
# Act - with underscores=False
result = await maybe_await(manager.to_langchain(use_underscores=False), is_async)
# Assert
assert len(result) == 1
assert result[0].name == "Search.SearchGoogle"
def test_get_tools_with_explicit(manager, mock_arcade_client, make_tool):
@pytest.mark.asyncio
async def test_deprecated_get_tools_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
If tools or toolkits are provided to get_tools, the manager should
retrieve or update the internal _tools dictionary accordingly,
then return them as StructuredTool objects.
Test that the deprecated get_tools method still works but issues a warning.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
mock_tool = make_tool("Search_SearchGoogle")
client.tools.get.return_value = mock_tool
manager._tools = {} # Ensure no tools are already loaded
# Act - Check for deprecation warning
with pytest.warns(DeprecationWarning):
result = await maybe_await(manager.get_tools(tools=["Search_SearchGoogle"]), is_async)
# Assert - Method should still work
assert len(result) == 1
assert "Search_SearchGoogle" in manager.tools
client.tools.get.assert_called_once_with(name="Search_SearchGoogle")
@pytest.mark.asyncio
async def test_add_tool_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
Test that add_tool adds a single tool to the manager without clearing existing tools.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
# Set up two different mock tools
mock_tool_google = make_tool("Search_SearchGoogle")
mock_tool_bing = make_tool("Search_SearchBing")
mock_arcade_client.tools.get.side_effect = [mock_tool_google, mock_tool_bing]
# First tool already exists in manager
manager._tools = {"Search_SearchGoogle": mock_tool_google}
# Second tool will be added
client.tools.get.return_value = mock_tool_bing
# Act
retrieved_tools = manager.get_tools(tools=["Search_SearchGoogle", "Search_SearchBing"])
await maybe_await(manager.add_tool("Search_SearchBing"), is_async)
# Assert - Both tools should now be in the manager
assert "Search_SearchGoogle" in manager.tools
assert "Search_SearchBing" in manager.tools
assert len(manager.tools) == 2
client.tools.get.assert_called_once_with(name="Search_SearchBing")
@pytest.mark.asyncio
async def test_add_toolkit_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
Test that add_toolkit adds all tools from a toolkit without clearing existing tools.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
# Create a tool that's already in the manager
mock_tool_google = make_tool("Search_SearchGoogle")
manager._tools = {"Search_SearchGoogle": mock_tool_google}
# Create tools to be added from the toolkit
mock_tool_bing = make_tool("Search_SearchBing")
mock_tool_ddg = make_tool("Search_SearchDuckDuckGo")
# Mock the response for toolkit listing
page_cls = AsyncOffsetPage if is_async else SyncOffsetPage
client.tools.list.return_value = page_cls(items=[mock_tool_bing, mock_tool_ddg])
# Act
await maybe_await(manager.add_toolkit("Search"), is_async)
# Assert - All tools should now be in the manager
assert len(manager.tools) == 3
assert "Search_SearchGoogle" in manager.tools
assert "Search_SearchBing" in manager.tools
assert "Search_SearchDuckDuckGo" in manager.tools
client.tools.list.assert_called_once_with(toolkit="Search", limit=NOT_GIVEN, offset=NOT_GIVEN)
@pytest.mark.asyncio
async def test_is_authorized_with_response_object_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client
):
"""
Test the is_authorized method accepting both authorization ID string and AuthorizationResponse.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
mock_type = AsyncMock if is_async else MagicMock
client.auth.status.return_value = mock_type(status="completed")
# Create an auth response object
auth_response = AuthorizationResponse(
id="auth_abc", status="pending", tool_fully_qualified_name="Search_SearchGoogle"
)
# Act - Test with string ID
status_result1 = await maybe_await(manager.is_authorized("auth_abc"), is_async)
# Act - Test with response object
status_result2 = await maybe_await(manager.is_authorized(auth_response), is_async)
# Assert
assert status_result1 is True
assert status_result2 is True
client.auth.status.assert_any_call(id="auth_abc")
client.auth.status.assert_any_call(
id="auth_abc"
) # Should be called with the same ID both times
@pytest.mark.asyncio
async def test_wait_for_auth_with_response_object_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client
):
"""
Test the wait_for_auth method accepting both authorization ID string and AuthorizationResponse.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
completed_response = AuthorizationResponse(
id="auth_abc", status="completed", tool_fully_qualified_name="Search_SearchGoogle"
)
client.auth.wait_for_completion.return_value = completed_response
# Create an auth response object
auth_response = AuthorizationResponse(
id="auth_abc", status="pending", tool_fully_qualified_name="Search_SearchGoogle"
)
# Act - Test with string ID
result1 = await maybe_await(manager.wait_for_auth("auth_abc"), is_async)
# Act - Test with response object
result2 = await maybe_await(manager.wait_for_auth(auth_response), is_async)
# Assert
assert result1 == completed_response
assert result2 == completed_response
client.auth.wait_for_completion.assert_any_call("auth_abc")
client.auth.wait_for_completion.assert_any_call(
"auth_abc"
) # Should be called with the same ID both times
@pytest.mark.asyncio
async def test_get_tools_no_init_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
Test that the deprecated get_tools method without previous initialization
issues a warning and fetches tools.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
mock_tool = make_tool("Search_SearchGoogle")
page_cls = AsyncOffsetPage if is_async else SyncOffsetPage
client.tools.list.return_value = page_cls(items=[mock_tool])
# Act - Check for deprecation warning
with pytest.warns(DeprecationWarning):
tools = await maybe_await(
manager.get_tools(), is_async
) # No param means manager calls list
# Assert
assert len(tools) == 0
assert "Search_SearchGoogle" not in manager.tools
@pytest.mark.asyncio
async def test_get_tools_with_explicit_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
Test that the deprecated get_tools method with explicitly specified tools
issues a warning and fetches the requested tools.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
mock_tool_google = make_tool("Search_SearchGoogle")
mock_tool_bing = make_tool("Search_SearchBing")
client.tools.get.side_effect = [mock_tool_google, mock_tool_bing]
# Act - Check for deprecation warning
with pytest.warns(DeprecationWarning):
retrieved_tools = await maybe_await(
manager.get_tools(tools=["Search_SearchGoogle", "Search_SearchBing"]), is_async
)
# Assert
assert len(retrieved_tools) == 2
assert set(manager.tools) == {"Search_SearchGoogle", "Search_SearchBing"}
mock_arcade_client.tools.get.assert_any_call(name="Search_SearchGoogle")
mock_arcade_client.tools.get.assert_any_call(name="Search_SearchBing")
client.tools.get.assert_any_call(name="Search_SearchGoogle")
client.tools.get.assert_any_call(name="Search_SearchBing")
def test_authorize(manager, mock_arcade_client):
def test_arcade_tool_manager_deprecation_warning():
"""
Test that the ArcadeToolManager class issues a deprecation warning.
"""
# Act - Check for deprecation warning
with pytest.warns(DeprecationWarning) as warnings_record:
ArcadeToolManager(client=MagicMock())
# Assert
assert any("ArcadeToolManager is deprecated" in str(w.message) for w in warnings_record)
@pytest.mark.asyncio
async def test_authorize_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client
):
"""
Test the authorize method to ensure it calls the Arcade client's
tools.authorize method correctly.
"""
# Arrange
mock_arcade_client.tools.authorize.return_value = AuthorizationResponse(
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
auth_response = AuthorizationResponse(
id="auth_123", status="pending", tool_fully_qualified_name="Search_SearchGoogle"
)
client.tools.authorize.return_value = auth_response
# Act
response = manager.authorize(tool_name="Search_SearchGoogle", user_id="user_123")
response = await maybe_await(
manager.authorize(tool_name="Search_SearchGoogle", user_id="user_123"), is_async
)
# Assert
assert response.id == "auth_123"
assert response.status == "pending"
mock_arcade_client.tools.authorize.assert_called_once_with(
client.tools.authorize.assert_called_once_with(
tool_name="Search_SearchGoogle", user_id="user_123"
)
def test_is_authorized(manager, mock_arcade_client):
"""
Test the is_authorized method which checks if authorization
has completed for a given authorization ID.
"""
# Arrange
mock_arcade_client.auth.status.return_value = MagicMock(status="completed")
# Act
status_result = manager.is_authorized("auth_abc")
# Assert
assert status_result is True
mock_arcade_client.auth.status.assert_called_once_with(id="auth_abc")
def test_requires_auth_true(manager, make_tool):
"""
Test the requires_auth method returning True if
@ -270,7 +536,7 @@ def test_retrieve_tool_definitions_tools_only(manager, mock_arcade_client, make_
# Assert
assert len(results) == 1
assert "Search_SearchGoogle" in results
assert results[0].fully_qualified_name == "Search_SearchGoogle"
mock_arcade_client.tools.get.assert_called_once_with(name="Search_SearchGoogle")
@ -287,25 +553,154 @@ def test_retrieve_tool_definitions_toolkits_only(manager, mock_arcade_client, ma
# Assert
assert len(results) == 1
assert "Search_SearchBing" in results
mock_arcade_client.tools.list.assert_called_once_with(toolkit="Search")
assert results[0].fully_qualified_name == "Search_SearchBing"
mock_arcade_client.tools.list.assert_called_once_with(
toolkit="Search", limit=NOT_GIVEN, offset=NOT_GIVEN
)
def test_retrieve_tool_definitions_no_args(manager, mock_arcade_client, make_tool):
def test_retrieve_tool_definitions_raise_on_empty(manager):
"""
Test the internal _retrieve_tool_definitions method when no
arguments are provided, retrieving all available tools.
Test that _retrieve_tool_definitions raises ValueError when no tools or toolkits
are provided and raise_on_empty is True.
"""
# Arrange
mock_tool1 = make_tool("Search_SearchGoogle")
mock_tool2 = make_tool("Search_SearchBing")
mock_arcade_client.tools.list.return_value = SyncOffsetPage(items=[mock_tool1, mock_tool2])
# Act & Assert
with pytest.raises(ValueError) as excinfo:
manager._retrieve_tool_definitions(tools=None, toolkits=None, raise_on_empty=True)
assert "No tools or toolkits provided" in str(excinfo.value)
def test_retrieve_tool_definitions_empty_no_raise(manager):
"""
Test that _retrieve_tool_definitions returns empty list when no tools or toolkits
are provided and raise_on_empty is False.
"""
# Act
results = manager._retrieve_tool_definitions()
results = manager._retrieve_tool_definitions(tools=None, toolkits=None, raise_on_empty=False)
# Assert
assert len(results) == 2
assert "Search_SearchGoogle" in results
assert "Search_SearchBing" in results
mock_arcade_client.tools.list.assert_called_once()
assert results == []
@pytest.mark.asyncio
async def test_retrieve_tool_definitions_with_limit_offset_parameterized(
manager_fixture, mock_arcade_client, async_mock_arcade_client, make_tool
):
"""
Test that _retrieve_tool_definitions respects limit and offset parameters.
"""
# Arrange
manager, is_async = manager_fixture
client = async_mock_arcade_client if is_async else mock_arcade_client
mock_tool = make_tool("Search_SearchGoogle")
page_cls = AsyncOffsetPage if is_async else SyncOffsetPage
client.tools.list.return_value = page_cls(items=[mock_tool])
# Act
if is_async:
results = await manager._retrieve_tool_definitions(toolkits=["Search"], limit=10, offset=5)
else:
results = manager._retrieve_tool_definitions(toolkits=["Search"], limit=10, offset=5)
# Assert
assert len(results) > 0
client.tools.list.assert_called_once_with(toolkit="Search", limit=10, offset=5)
def test_get_client_config_with_kwargs():
"""
Test that _get_client_config prioritizes kwargs over environment variables.
"""
# Arrange
manager = ToolManager(client=MagicMock()) # Client won't be used here
# Act
with patch.dict("os.environ", {"ARCADE_API_KEY": "env_key", "ARCADE_BASE_URL": "env_url"}):
result = manager._get_client_config(api_key="kwarg_key", base_url="kwarg_url")
# Assert
assert result["api_key"] == "kwarg_key"
assert result["base_url"] == "kwarg_url"
def test_get_client_config_with_env_vars():
"""
Test that _get_client_config falls back to environment variables when kwargs not provided.
"""
# Arrange
manager = ToolManager(client=MagicMock()) # Client won't be used here
# Act
with patch.dict("os.environ", {"ARCADE_API_KEY": "env_key", "ARCADE_BASE_URL": "env_url"}):
result = manager._get_client_config()
# Assert
assert result["api_key"] == "env_key"
assert result["base_url"] == "env_url"
def test_getitem_access(manager, make_tool):
"""
Test that __getitem__ allows dictionary-style access to tools.
"""
# Arrange
tool_name = "Search_SearchGoogle"
mock_tool_def = make_tool(tool_name)
manager._tools[tool_name] = mock_tool_def
# Act
definition = manager[tool_name]
# Assert
assert definition == mock_tool_def
def test_getitem_missing(manager):
"""
Test that __getitem__ raises ValueError for missing tools.
"""
# Act & Assert
with pytest.raises(ValueError) as excinfo:
_ = manager["Nonexistent.Tool"]
assert "Tool 'Nonexistent.Tool' not found" in str(excinfo.value)
def test_create_tool_map_with_underscores(make_tool):
"""
Test the _create_tool_map function with use_underscores=True.
"""
# Arrange
from langchain_arcade.manager import _create_tool_map
tool1 = make_tool("Search.SearchGoogle")
tool2 = make_tool("Gmail.SendEmail")
# Act
result = _create_tool_map([tool1, tool2], use_underscores=True)
# Assert
assert "Search_SearchGoogle" in result
assert "Gmail_SendEmail" in result
assert len(result) == 2
def test_create_tool_map_with_dots(make_tool):
"""
Test the _create_tool_map function with use_underscores=False.
"""
# Arrange
from langchain_arcade.manager import _create_tool_map
tool1 = make_tool("Search.SearchGoogle")
tool2 = make_tool("Gmail.SendEmail")
# Act
result = _create_tool_map([tool1, tool2], use_underscores=False)
# Assert
assert "Search.SearchGoogle" in result
assert "Gmail.SendEmail" in result
assert len(result) == 2

View file

@ -1,10 +1,16 @@
[tox]
skipsdist = true
envlist = py310, py311, py312
[testenv]
deps =
pytest
pytest-cov
[gh-actions]
python =
3.10: py310
3.11: py311
3.12: py312
[testenv]
passenv = PYTHON_VERSION
allowlist_externals = poetry
commands =
pytest --cov=langchain_arcade --cov-report=term-missing
poetry install -v --all-extras
pytest --doctest-modules tests --cov --cov-config=pyproject.toml --cov-report=xml

View file

@ -1,50 +1,60 @@
import os
from langchain_arcade import ArcadeToolManager
from langchain_arcade import ToolManager
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.prebuilt import create_react_agent
# 1) Set API keys (place your real keys in env variables or directly below)
arcade_api_key = os.environ.get("ARCADE_API_KEY", "YOUR_ARCADE_API_KEY")
openai_api_key = os.environ.get("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY")
# 2) Create an ArcadeToolManager and fetch tools from the "Google" toolkit.
manager = ArcadeToolManager(api_key=arcade_api_key)
# 2) Create an ToolManager and fetch/add tools/toolkits
manager = ToolManager(api_key=arcade_api_key)
# Tool names follow the format "ToolkitName.ToolName"
tools = manager.get_tools(tools=["Web.ScrapeUrl"])
tools = manager.init_tools(tools=["Web.ScrapeUrl"])
print(manager.tools)
# Get all tools from a toolkit
tools = manager.get_tools(toolkits=["Google"])
tools = manager.init_tools(toolkits=["Google"])
print(manager.tools)
# 3) Create a ChatOpenAI model and bind the Arcade tools.
model = ChatOpenAI(model="gpt-4o", api_key=openai_api_key)
bound_model = model.bind_tools(tools)
# add a tool
manager.add_tool("Search.SearchGoogle")
print(manager.tools)
# 4) Use MemorySaver for checkpointing.
# add a toolkit
manager.add_toolkit("Search")
print(manager.tools)
# 3) Get StructuredTool objects for langchain
lc_tools = manager.to_langchain()
# 4) Create a ChatOpenAI model and bind the Arcade tools.
model = ChatOpenAI(model="gpt-4o", api_key=openai_api_key)
bound_model = model.bind_tools(lc_tools)
# 5) Use MemorySaver for checkpointing.
memory = MemorySaver()
# 5) Create a ReAct-style agent from the prebuilt function.
graph = create_react_agent(model=bound_model, tools=tools, checkpointer=memory)
graph = create_react_agent(model=bound_model, tools=lc_tools, checkpointer=memory)
# 6) Provide basic config and a user query.
# Note: user_id is required for the tool to be authorized
config = {"configurable": {"thread_id": "1", "user_id": "user@example.coom"}}
config = {"configurable": {"thread_id": "1", "user_id": "userrerr@example.coom"}}
user_input = {"messages": [("user", "List any new and important emails in my inbox.")]}
# 7) Stream the agent's output. If the tool is unauthorized, it may trigger NodeInterrupt.
try:
for chunk in graph.stream(user_input, config, stream_mode="values"):
chunk["messages"][-1].pretty_print()
except NodeInterrupt as exc:
print(f"\nNodeInterrupt occurred: {exc}")
print("Please authorize the tool or update the request, then re-run.")
# 7) Stream the agent's output. If the tool is unauthorized, it may trigger interrupts
for chunk in graph.stream(user_input, config, stream_mode="values", debug=True):
chunk["messages"][-1].pretty_print()
# If you need to authorize, you can do so via:
# auth_res = manager.authorize("Google_ListEmails", user_id="someone@example.com")
# manager.wait_for_auth(auth_res.id)
# Then run the graph again or edit the final tool call and call graph.stream(None, config).
# if we were interrupted, we can check for interrupts in state
current_state = graph.get_state(config)
if current_state.tasks:
for task in current_state.tasks:
if hasattr(task, "interrupts"):
for interrupt in task.interrupts:
print(interrupt.value)
# Once you login using the printed link, you can resume the agent

View file

@ -1,7 +1,7 @@
import os
# Import necessary classes and modules
from langchain_arcade import ArcadeToolManager
from langchain_arcade import ToolManager
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
@ -9,11 +9,14 @@ from langgraph.prebuilt import ToolNode
arcade_api_key = os.environ["ARCADE_API_KEY"]
# Initialize the tool manager and fetch tools compatible with langgraph
tool_manager = ArcadeToolManager(api_key=arcade_api_key)
tools = tool_manager.get_tools(
toolkits=["Google"], langgraph=True
) # use langgraph-specific behavior
# Initialize the tool manager and fetch tools
manager = ToolManager(api_key=arcade_api_key)
manager.init_tools(toolkits=["Github"])
# convert to langchain tools and use interrupts for auth
tools = manager.to_langchain(use_interrupts=True)
# Initialize the prebuilt tool node
tool_node = ToolNode(tools)
# Create a language model instance and bind it with the tools
@ -36,7 +39,7 @@ def call_agent(state: MessagesState):
def should_continue(state: MessagesState):
if state["messages"][-1].tool_calls:
for tool_call in state["messages"][-1].tool_calls:
if tool_manager.requires_auth(tool_call["name"]):
if manager.requires_auth(tool_call["name"]):
return "authorization"
return "tools" # Proceed to tool execution if no authorization is needed
return END # End the workflow if no tool calls are present
@ -47,17 +50,17 @@ def authorize(state: MessagesState, config: dict):
user_id = config["configurable"].get("user_id")
for tool_call in state["messages"][-1].tool_calls:
tool_name = tool_call["name"]
if not tool_manager.requires_auth(tool_name):
if not manager.requires_auth(tool_name):
continue
auth_response = tool_manager.authorize(tool_name, user_id)
auth_response = manager.authorize(tool_name, user_id)
if auth_response.status != "completed":
# Prompt the user to visit the authorization URL
print(f"Visit the following URL to authorize: {auth_response.url}")
# wait for the user to complete the authorization
# and then check the authorization status again
tool_manager.wait_for_auth(auth_response.id)
if not tool_manager.is_authorized(auth_response.id):
manager.wait_for_auth(auth_response.id)
if not manager.is_authorized(auth_response.id):
# node interrupt?
raise ValueError("Authorization failed")
@ -90,13 +93,13 @@ if __name__ == "__main__":
"messages": [
{
"role": "user",
"content": "Check and see if I have any important emails in my inbox",
"content": "Star arcadeai/arcade-ai on github",
}
],
}
# Configuration with thread and user IDs for authorization purposes
config = {"configurable": {"thread_id": "4", "user_id": "user@example.com"}}
config = {"configurable": {"thread_id": "4", "user_id": "user@example.comm"}}
# Run the graph and stream the outputs
for chunk in graph.stream(inputs, config=config, stream_mode="values"):

View file

@ -1,3 +1,3 @@
langchain-google-community[gmail]>=0.1.1
langchain-openai>=0.1.1
langchain-arcade>=1.0.0
langchain-arcade>=1.2.0

View file

@ -1,21 +1,16 @@
## Setup
### API keys
Follow [these instructions](https://docs.arcade.dev/home/custom-tools/) to Install Arcade and create an API key.
This example is using OpenAI, as the LLM provider. Ensure you have an [OpenAI API key](https://platform.openai.com/docs/quickstart).
### Environment variables
### Environment
Copy the `env.example` file to `.env` and supply your API keys for **at least** `OPENAI_API_KEY` and `ARCADE_API_KEY`.
- Arcade API key: `ARCADE_API_KEY` (instructions [here](https://docs.arcade.dev/home/api-keys))
- OpenAI API key: `OPENAI_API_KEY` (instructions [here](https://platform.openai.com/docs/quickstart))
## Usage with LangGraph API
### Local testing with LangGraph Studio
For testing locally (e.g., currently supported only on MacOS), you can use the LangGraph Studio desktop application.
[Download LangGraph Studio](https://github.com/langchain-ai/langgraph-studio?tab=readme-ov-file#download) and open this directory in the Studio application.
The `langgraph.json` file in this directory specifies the graph that will be loaded in Studio.

View file

@ -2,7 +2,7 @@ import os
from datetime import datetime
from configuration import AgentConfigurable
from langchain_arcade import ArcadeToolManager
from langchain_arcade import ToolManager
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langgraph.errors import NodeInterrupt
@ -13,9 +13,9 @@ from langgraph.prebuilt import ToolNode
arcade_api_key = os.getenv("ARCADE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
toolkit = ArcadeToolManager(api_key=arcade_api_key)
# Retrieve tools compatible with LangGraph
tools = toolkit.get_tools()
manager = ToolManager(api_key=arcade_api_key)
manager.init_tools(toolkits=["Github"])
tools = manager.to_langchain(use_interrupts=True)
tool_node = ToolNode(tools)
PROMPT_TEMPLATE = f"""
@ -58,7 +58,7 @@ def should_continue(state: AgentState, config: dict):
def check_auth(state: AgentState, config: dict):
user_id = config["configurable"].get("user_id")
tool_name = state["messages"][-1].tool_calls[0]["name"]
auth_response = toolkit.authorize(tool_name, user_id)
auth_response = manager.authorize(tool_name, user_id)
if auth_response.status != "completed":
return {"auth_url": auth_response.url}
else:
@ -69,7 +69,7 @@ def authorize(state: AgentState, config: dict):
"""Function to handle tool authorization"""
user_id = config["configurable"].get("user_id")
tool_name = state["messages"][-1].tool_calls[0]["name"]
auth_response = toolkit.authorize(tool_name, user_id)
auth_response = manager.authorize(tool_name, user_id)
if auth_response.status != "completed":
auth_message = (
f"Please authorize the application in your browser:\n\n {state.get('auth_url')}"