arcade-mcp/toolkits/slack/arcade_slack/tools/chat.py
Nate Barbettini 799d376ae5
SDK: Generic OAuth 2.0 connector (#81)
- Implements https://app.clickup.com/t/86b1whxb3 on the SDK side
- - Corresponding Engine PR:
https://github.com/ArcadeAI/Engine/pull/113/files?w=1
- Updates existing toolkits with new syntax.
2024-10-03 16:40:02 -07:00

131 lines
4.3 KiB
Python

from typing import Annotated
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from arcade.core.errors import RetryableToolError, ToolExecutionError
from arcade.core.schema import ToolContext
from arcade.sdk import tool
from arcade.sdk.auth import Slack
@tool(
requires_auth=Slack(
scopes=[
"chat:write",
"im:write",
"users.profile:read",
"users:read",
],
)
)
def send_dm_to_user(
context: ToolContext,
user_name: Annotated[
str,
"The Slack username of the person you want to message. Slack usernames are ALWAYS lowercase.",
],
message: Annotated[str, "The message you want to send"],
):
"""Send a direct message to a user in Slack."""
slackClient = WebClient(token=context.authorization.token)
try:
# Step 1: Retrieve the user's Slack ID based on their username
userListResponse = slackClient.users_list()
user_id = None
for user in userListResponse["members"]:
if user["name"].lower() == user_name.lower():
user_id = user["id"]
break
if not user_id:
raise RetryableToolError(
"User not found",
developer_message=f"User with username '{user_name}' not found.",
additional_prompt_content=format_users(userListResponse),
retry_after_ms=500, # Play nice with Slack API rate limits
)
# Step 2: Retrieve the DM channel ID with the user
im_response = slackClient.conversations_open(users=[user_id])
dm_channel_id = im_response["channel"]["id"]
# Step 3: Send the message as if it's from you (because we're using a user token)
slackClient.chat_postMessage(channel=dm_channel_id, text=message)
except SlackApiError as e:
error_message = e.response["error"] if "error" in e.response else str(e)
raise ToolExecutionError(
"Error sending message",
developer_message=f"Slack API Error: {error_message}",
)
def format_users(userListResponse: dict) -> str:
csv_string = "All active Slack users:\n\nname,real_name\n"
for user in userListResponse["members"]:
if not user.get("deleted", False):
name = user.get("name", "")
real_name = user.get("profile", {}).get("real_name", "")
csv_string += f"{name},{real_name}\n"
return csv_string.strip()
@tool(
requires_auth=Slack(
scopes=[
"chat:write",
"channels:read",
"groups:read",
],
)
)
def send_message_to_channel(
context: ToolContext,
channel_name: Annotated[
str,
"The Slack channel name where you want to send the message. Slack channel names are ALWAYS lowercase.",
],
message: Annotated[str, "The message you want to send"],
):
"""Send a message to a channel in Slack."""
slackClient = WebClient(token=context.authorization.token)
try:
# Step 1: Retrieve the list of channels
channels_response = slackClient.conversations_list()
channel_id = None
for channel in channels_response["channels"]:
if channel["name"].lower() == channel_name.lower():
channel_id = channel["id"]
break
if not channel_id:
raise RetryableToolError(
"Channel not found",
developer_message=f"Channel with name '{channel_name}' not found.",
additional_prompt_content=format_channels(channels_response),
retry_after_ms=500, # Play nice with Slack API rate limits
)
# Step 2: Send the message to the channel
slackClient.chat_postMessage(channel=channel_id, text=message)
except SlackApiError as e:
error_message = e.response["error"] if "error" in e.response else str(e)
raise ToolExecutionError(
"Error sending message",
developer_message=f"Slack API Error: {error_message}",
)
def format_channels(channels_response: dict) -> str:
csv_string = "All active Slack channels:\n\nname\n"
for channel in channels_response["channels"]:
if not channel.get("is_archived", False):
name = channel.get("name", "")
csv_string += f"{name}\n"
return csv_string.strip()