Working read email call

This commit is contained in:
Sam Partee 2024-04-25 18:20:31 -07:00
parent 56531cbd18
commit 9ba728f755
7 changed files with 148 additions and 81 deletions

View file

@ -9,21 +9,22 @@ from pydantic import BaseModel
import pandas as pd
from toolserve.sdk import Param, Secret, tool
from toolserve.sdk import Param, tool, get_secret
@tool
def send_email(
sender_email: Param(str, "Email address of the sender"),
sender_password: Secret(str, "gmail_password"),
recipient_email: Param(str, "Email address of the recipient"),
subject: Param(str, "Subject of the email"),
body: Param(str, "Body of the email"),
server: Secret(str, "gmail_stmp_server"),
port: Secret(str, "gmail_smtp_port")
):
"""Send an email via gmail SMTP server"""
sender_password = get_secret("gmail_password")
server = get_secret("gmail_stmp_server", "smtp.gmail.com")
port = get_secret("gmail_smtp_port", 587)
message = MIMEMultipart()
message['From'] = sender_email
message['To'] = recipient_email
@ -44,25 +45,27 @@ def send_email(
@tool
def read_email(
email: Param(str, "Email address of the recipient"),
password: Secret(str, "gmail_password"),
server: Secret(str, "gmail_stmp_server"),
port: Secret(int, "gmail_smtp_port")
email_address: Param(str, "Email address of the recipient"),
n_emails: Param(int, "Number of emails to read") = 5,
) -> Param(str, "JSON dataframe of List of emails"):
"""Read emails from a Gmail account"""
password = get_secret("gmail_password")
server = get_secret("gmail_stmp_server", "smtp.gmail.com")
port = get_secret("gmail_smtp_port", 587)
# Connect to the Gmail IMAP server
mail = imaplib.IMAP4_SSL(server)
mail.login(email, password)
mail.login(email_address, password)
mail.select("inbox") # connect to inbox.
result, data = mail.search(None, "ALL")
email_ids = data[0].split()
email_ids.reverse() # Reverse to get the most recent emails first
emails = []
for email_id in email_ids:
for email_id in email_ids[:n_emails]:
result, data = mail.fetch(email_id, "(RFC822)")
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
@ -70,16 +73,16 @@ def read_email(
email_details = {
"from": msg["From"],
"to": msg["To"],
"subject": decode_header(msg["Subject"])[0][0],
#"subject": decode_header(msg["Subject"])[0][0],
"date": msg["Date"]
}
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
email_details["body"] = part.get_payload(decode=True).decode()
email_details["body"] = part.get_payload(decode=True)
else:
email_details["body"] = msg.get_payload(decode=True).decode()
email_details["body"] = msg.get_payload(decode=True)
emails.append(email_details)

View file

@ -56,7 +56,7 @@ class ResponseBase:
"""
@staticmethod
async def __response(*, res: CustomResponseCode | CustomResponse = None, data: Any | None = None) -> ResponseModel:
async def __response(*, res: CustomResponseCode | CustomResponse = None, msg: str | None = None, data: Any | None = None) -> ResponseModel:
"""
General method for successful response
@ -64,7 +64,8 @@ class ResponseBase:
:param data: Response data
:return:
"""
return ResponseModel(code=res.code, msg=res.msg, data=data)
msg = msg if msg else res.msg
return ResponseModel(code=res.code, msg=msg, data=data)
async def success(
self,
@ -82,5 +83,14 @@ class ResponseBase:
) -> ResponseModel:
return await self.__response(res=res, data=data)
async def error(
self,
*,
res: CustomResponseCode | CustomResponse = CustomResponseCode.HTTP_400,
msg: str = CustomResponseCode.HTTP_400.msg,
data: Any = None,
) -> ResponseModel:
return await self.__response(res=res, msg=msg, data=data)
response_base = ResponseBase()

View file

@ -2,5 +2,5 @@
from .tool import (
Param,
tool,
Secret
get_secret
)

View file

@ -1,23 +1,35 @@
from typing import Annotated, TypeVar, _AnnotatedAlias, Type, Callable, Any
from typing import Annotated, TypeVar, _AnnotatedAlias, Type, Callable, Any, Optional
import functools
import os
import asyncio
T = TypeVar('T')
class SecretKey:
def __init__(self, key: str):
self.key = key
class Description:
def __init__(self, description: str):
self.description = description
def __str__(self):
return self.description
def Param(type_: Type[T], description: str) -> Annotated[T, Description]:
return Annotated[type_, Description(description)]
def Secret(type_: Type[T], key: str) -> Annotated[T, SecretKey]:
return Annotated[type_, SecretKey(key)]
def tool(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
return func(*args, **kwargs)
async def wrapper(*args, **kwargs) -> Any:
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
loop = asyncio.get_running_loop()
partial_func = functools.partial(func, *args, **kwargs)
return await loop.run_in_executor(None, partial_func)
return wrapper
def get_secret(name: str, default: Optional[Any] = None) -> str:
secret = os.getenv(name)
if secret is None:
if default is not None:
return default
raise ValueError(f"Secret {name} is not set.")
return secret

View file

@ -6,6 +6,7 @@ import inspect
from datetime import datetime
from typing import List, Optional, Type, Dict, Annotated, Any, Callable, Tuple
from pathlib import Path
import asyncio
from fastapi import APIRouter
from pydantic import BaseModel, ValidationError, Field, create_model
@ -15,7 +16,8 @@ from toolserve.server.core.conf import settings
from toolserve.common.response_code import CustomResponseCode
from toolserve.common.response import ResponseModel, response_base
from toolserve.apm.base import ToolPack
from toolserve.sdk import Param, Secret
from toolserve.sdk import Param
from toolserve.utils import snake_to_camel
class ToolMeta(BaseModel):
module: str
@ -47,6 +49,7 @@ class ToolCatalog:
tools = {}
for name, tool_spec in toolpack.tools.items():
print(name, tool_spec)
module_name, versioned_tool = tool_spec.split('.', 1)
func_name, version = versioned_tool.split('@')
@ -57,14 +60,16 @@ class ToolCatalog:
module=module_name,
path=module.__file__
)
input_model, output_model = create_pydantic_models_for_ds_tool(tool)
input_model, output_model = create_func_models(tool)
response_model = create_response_model(name, output_model)
tool_schema = ToolSchema(
name=name,
description=tool.__doc__,
version=version,
tool=tool,
input_model=input_model,
output_model=output_model,
output_model=response_model,
meta=tool_meta
)
tools[name] = tool_schema
@ -87,43 +92,91 @@ class ToolCatalog:
def list_tools(self) -> List[Dict[str, str]]:
return [{'name': t.name, 'description': t.description} for t in self.tools]
# ActionCatalog class
def create_pydantic_models_for_ds_tool(func: Callable) -> Tuple[Type[BaseModel], Type[BaseModel]]:
"""
Dynamically create Pydantic models for the input and output of a function decorated with "@ds.tool".
Parameters:
- func: The function to analyze and create models for.
def create_func_models(func: Callable) -> Tuple[Type[BaseModel], Type[BaseModel]]:
"""
Analyze a function to create corresponding Pydantic models for its input and output.
Args:
func (Callable): The function to analyze.
Returns:
- A tuple containing the original function, the input Pydantic model, and the output Pydantic model.
Tuple[Type[BaseModel], Type[BaseModel]]: A tuple containing the input and output Pydantic models.
"""
# Extract the function signature
sig = inspect.signature(func)
input_fields = {}
for name, param in sig.parameters.items():
# Determine the type of parameter, handling special types like Param and Secret
annotation = param.annotation
if hasattr(annotation, '__origin__') and annotation.__origin__ in [Param, Secret]:
# Extract the inner type and description from Param/Secret
field_type = annotation.__args__[0]
description = annotation.__metadata__[0] if annotation.__metadata__ else ""
default = param.default if param.default is not inspect.Parameter.empty else ...
input_fields[name] = (field_type, default, description)
else:
input_fields[name] = (param.annotation, param.default)
if asyncio.iscoroutinefunction(func):
func = func.__wrapped__
for name, param in inspect.signature(func, follow_wrapped=True).parameters.items():
field_info = extract_field_info(param)
input_fields[name] = (field_info['type'], Field(**field_info['field_params']))
# Create the input model dynamically
input_model = create_model(f"{func.__name__}Input", **input_fields)
input_model = create_model(f"{snake_to_camel(func.__name__)}Input", **input_fields)
output_model = determine_output_model(func)
# Dynamically create the output model, handling complex return types with appropriate annotations
output_fields = {}
return_annotation = sig.return_annotation
if not return_annotation is inspect.Signature.empty:
if hasattr(return_annotation, '__args__'): # Check if it's a generic type (e.g., List[int])
output_fields = {'result': (return_annotation.__args__[0], ...)}
else:
output_fields = {'result': (return_annotation, ...)}
output_model = create_model(f"{func.__name__}Output", **output_fields)
return input_model, output_model
def extract_field_info(param: inspect.Parameter) -> dict:
"""
Extract type and field parameters from a function parameter.
Args:
param (inspect.Parameter): The parameter to extract information from.
Returns:
dict: A dictionary with 'type' and 'field_params'.
"""
annotation = param.annotation
default = param.default if param.default is not inspect.Parameter.empty else None
description = getattr(annotation, '__metadata__', [None])[0] if hasattr(annotation, '__metadata__') else None
field_params = {
'default': default,
'description': str(description) if description else "No description provided."
}
# Handle specific annotations like Param and Secret if needed
if hasattr(annotation, '__origin__') and annotation.__origin__ in [Param]:
field_type = annotation.__args__[0]
else:
field_type = annotation
return {'type': field_type, 'field_params': field_params}
def determine_output_model(func: Callable) -> Type[BaseModel]:
"""
Determine the output model for a function based on its return annotation.
Args:
func (Callable): The function to analyze.
Returns:
Type[BaseModel]: A Pydantic model representing the output.
"""
return_annotation = inspect.signature(func).return_annotation
if return_annotation is inspect.Signature.empty:
return create_model(f"{snake_to_camel(func.__name__)}Output")
elif hasattr(return_annotation, '__origin__'):
field_type = Optional[return_annotation.__args__[0]]
description = return_annotation.__metadata__[0] if return_annotation.__metadata__ else ""
if description:
return create_model(f"{snake_to_camel(func.__name__)}Output", result=(field_type, Field(description=str(description))))
else:
return create_model(f"{snake_to_camel(func.__name__)}Output", result=(return_annotation, Field(description="No description provided.")))
def create_response_model(name: str, output_model: Type[BaseModel]) -> Type[ResponseModel]:
"""
Create a response model for the given schema.
"""
# Create a new response model
response_model = create_model(
f"{name}Response",
code=(int, CustomResponseCode.HTTP_200.code),
msg=(str, CustomResponseCode.HTTP_200.msg),
data=(Optional[output_model], None)
)
return response_model

View file

@ -24,12 +24,13 @@ def create_endpoint_function(name, description, func, input_model, output_model)
try:
# Execute the action
result = await func(**body.dict())
valid_result = output_model(**result)
return await response_base.success(data=valid_result.dict())
return await response_base.success(data={"result": result})
except ValidationError as e:
return await response_base.fail(res=CustomResponseCode.HTTP_400, msg=str(e))
return await response_base.error(res=CustomResponseCode.HTTP_400, msg=str(e))
except Exception as e:
return await response_base.fail(res=CustomResponseCode.HTTP_500, msg=str(e))
import traceback
print(traceback.format_exc())
return await response_base.error(res=CustomResponseCode.HTTP_500, msg=str(e))
run.__name__ = name
run.__doc__ = description
@ -37,19 +38,6 @@ def create_endpoint_function(name, description, func, input_model, output_model)
return run
def create_response_model(name: str, output_model: Type[BaseModel]) -> Type[ResponseModel]:
"""
Create a response model for the given schema.
"""
# Create a new response model
response_model = create_model(
f"{name}Response",
code=(int, CustomResponseCode.HTTP_200.code),
msg=(str, CustomResponseCode.HTTP_200.msg),
data=(output_model, None)
)
return response_model
def generate_endpoint(schemas: List[ToolSchema]) -> APIRouter:
routers = []
@ -68,15 +56,15 @@ def generate_endpoint(schemas: List[ToolSchema]) -> APIRouter:
output_model=schema.output_model
)
response_model = create_response_model(schema.name, schema.output_model)
# Add the endpoint to the FastAPI app
router.post(
f"/{schema.name}",
name=schema.name,
summary=schema.description,
tags=[schema.meta.module],
response_model=response_model,
response_model=schema.output_model,
response_model_exclude_unset=True,
response_model_exclude_none=True,
response_description=create_output_description(schema.output_model)
)(run)
@ -98,6 +86,6 @@ def create_output_description(output_model: Type[BaseModel]) -> str:
output_description += "\n\n**Attributes:**\n\n"
for name, field in output_model.model_fields.items():
output_description += f"- **{name}** ({field.annotation.__name__}): {field.description}\n"
output_description += f"- **{name}** ({field.annotation.__name__})\n"
return output_description

View file

@ -1,11 +1,12 @@
import uvicorn
from pathlib import Path
import asyncio
from toolserve.common.log import log
from toolserve.server.core.conf import settings
from toolserve.server.core.registrar import register_app
app = register_app()
if __name__ == '__main__':