arcade-mcp/toolkits/gmail/arcade_gmail/tools/gmail.py
Nate Barbettini d90101ea70
Fix authorized tools (#14)
A few quick fixes while testing the gmail tool with the real Engine:
- Renamed `tool.requirements.auth` to `authorization` -- Engine already
used `authorization`
- Fixed the credentials initializer in the gmail tool
2024-08-16 17:13:44 -07:00

94 lines
2.9 KiB
Python

import re
from base64 import urlsafe_b64decode
from typing import Annotated
from bs4 import BeautifulSoup
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from arcade.core.schema import ToolContext
from arcade.sdk import tool
from arcade.sdk.auth import OAuth2
@tool(
requires_auth=OAuth2(
authority="https://accounts.google.com",
scope=["https://www.googleapis.com/auth/gmail.readonly"],
)
)
async def get_emails(
context: ToolContext,
n_emails: Annotated[int, "Number of emails to read"] = 5,
) -> dict[str, list[dict[str, str]]]:
"""Read emails from a Gmail account and extract plain text content, removing any HTML."""
# Call the Gmail API
service = build("gmail", "v1", credentials=Credentials(context.authorization.token))
# Request a list of all the messages
result = service.users().messages().list(userId="me").execute()
messages = result.get("messages")
# If there are no messages, return an empty string
if not messages:
return ""
emails = []
for msg in messages[:n_emails]:
txt = service.users().messages().get(userId="me", id=msg["id"]).execute()
try:
payload = txt["payload"]
headers = payload["headers"]
for d in headers:
if d["name"] == "From":
from_ = d["value"]
if d["name"] == "Date":
date = d["value"]
if d["name"] == "Subject":
subject = d["value"]
else:
subject = "No subject"
data = None
parts = payload.get("parts")
if parts:
part = parts[0]
body = part.get("body")
if body:
data = body.get("data")
if data:
data = urlsafe_b64decode(data).decode()
email_details = {
"from": from_,
"date": date,
"subject": subject,
"body": clean_email_body(data) if data else "",
}
emails.append(email_details)
except Exception as e:
print(f"Error reading email {msg['id']}: {e}", "ERROR")
continue
return {"emails": emails}
def clean_email_body(body: str) -> str:
"""Remove HTML tags and non-sentence elements from email body text."""
# Remove HTML tags using BeautifulSoup
soup = BeautifulSoup(body, "html.parser")
text = soup.get_text(separator=" ")
# Remove any non-sentence elements (e.g., URLs, email addresses, etc.)
text = re.sub(r"\S*@\S*\s?", "", text) # Remove emails
text = re.sub(r"http\S+", "", text) # Remove URLs
text = re.sub(r"[^.!?a-zA-Z0-9\s]", "", text) # Remove non-sentence characters
text = " ".join(text.split()) # Remove extra whitespace
return text