arcade-mcp/examples/sql-chat/agent.py
2024-05-01 23:53:53 -07:00

471 lines
16 KiB
Python

import httpx
import json
import time
import openai
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from typing import List, Dict
from textwrap import dedent
from pydantic import BaseModel, Field
from enum import Enum
from typing import Type
from toolserve.utils.openai_tool import model_to_json_schema
from typing import Dict, Any, Optional
import json
from collections import deque
class ToolClient:
available_tools = {
"query_sql": "/tool/query/query_sql",
"list_data_sources": "/tool/query/list_data_sources",
"get_data_schema": "/tool/query/get_data_schema",
"PlotDataframe": "/tool/gmailer/PlotDataframe",
"ReadEmail": "/tool/gmailer/ReadEmail",
"Summarize": "/tool/chat/Summarize",
}
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.Client()
self.tools = self.__collect_tool_specs()
def __collect_tool_specs(self) -> Dict[str, str]:
tools = {}
for tool_name, endpoint in self.available_tools.items():
openai_spec = self.call_api("GET", "/api/v1/tools/oai_function", params={"tool_name": tool_name}).get("data", {})
tools[tool_name] = openai_spec
return tools
def call_api(self, method: str, endpoint: str, params: dict = {}, data: dict = {}, json_data: dict = {}) -> Dict[str, Any]:
"""Call the Darkstar Toolserver API with the given parameters.
Args:
method (str): The HTTP method to use for the request.
endpoint (str): The endpoint to call.
params (dict): The query parameters for the request.
data (dict): The data to send in the request body.
json_data (dict): The JSON data to send in the request body.
Returns:
Dict[str, Any]: The response from the API.
"""
url = f"{self.base_url}{endpoint}"
response = self.client.request(method, url, params=params, json=json_data, data=data)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e}")
result = response.json()
return result
def execute_tool(self, tool_name: str, tool_args: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
Executes an tool using the Darkstar Toolserver API and an OpenAI model.
:param tool_name: The name of the tool to execute.
:return: The result of the tool
"""
# Prepare the input message for the OpenAI model
endpoint = self.available_tools[tool_name]
result = self.call_api("POST", endpoint, json_data=tool_args)
return result
class ToolRunner:
tool_prompt = dedent("""
Given a user query and the schema of the fields in a dataframe, generate the arguments for a tool to execute.
YOU MUST CALL THE TOOL.
The schema of the fields in the dataframe is as follows:
{schema}
If needed, the data_id for the source is: {data_id}
If needed, the output_name should be: {output_name}
""")
def __init__(self, base_url: str, model: str, api_key: str):
"""
Initialize the ToolRunner with necessary configurations.
Args:
base_url (str): The base URL for the API calls.
model (str): The model identifier to be used for queries.
api_key (str): The API key for authentication.
"""
self._client = ToolClient(base_url)
self._model = model
self._openai_client = openai.Client(api_key=api_key)
self._data_sources = self.__get_data_sources()
self._source = None
self._data_schema = None
self._data_id = None
def set_source(self, source: str):
self._data_sources = self.__get_data_sources()
retries = 3
data_id = None
while retries > 0:
try:
data_id = self._data_sources[source]
break
except KeyError:
retries -= 1
time.sleep(1)
self._data_sources = self.__get_data_sources()
if data_id is None:
raise ValueError(f"Data source '{source}' not found.")
# get the schema
schema = self._client.call_api("POST", "/tool/query/get_data_schema", json_data={"data_id": data_id})
self._source = source
self._data_schema = schema
self._data_id = data_id
def __get_data_sources(self) -> Dict[str, Dict[str, str]]:
response = self._client.call_api("POST", "/tool/query/list_data_sources")
sources = {}
for _id, source_data in response["data"]["result"].items():
sources[source_data["file_name"]] = _id
return sources
def __create_prompt(self, user_query: str, input_name: str, output_name: str) -> List[Dict[str, str]]:
schema = self._data_schema
data_id = self._data_sources[input_name]
prompt = self.tool_prompt.format(schema=schema, data_id=data_id, output_name=output_name)
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": user_query}
]
return messages
def get_tool_args(self, tool_name: str, messages: List[Dict[str, str]], output_name: str) -> Dict[str, Any]:
"""
Retrieves the required arguments for an tool from the Darkstar Toolserver API and
uses them to call an OpenAI model with predefined tools and messages.
:param tool_name: The name of the tool to execute.
:param messages: A list of messages to provide to the model.
:return: The result of the OpenAI model call.
"""
func_spec = self._client.tools.get(tool_name, {})
if not func_spec:
raise ValueError(f"Tool '{tool_name}' not found in available tools.")
tool = json.loads(func_spec)
print(tool)
# Call the OpenAI model with the tools and messages
completion = self._openai_client.chat.completions.create(
model="gpt-4-turbo",
messages=messages,
tools=[tool],
tool_choice="required"
)
predicted_args = completion.choices[0].message.tool_calls[0].function.arguments
args = json.loads(predicted_args)
if "params" in args:
params = args.get("params", [])
if params:
if isinstance(params, dict):
args["params"] = list(params.values())
elif isinstance(params, str):
args["params"] = [params]
elif isinstance(params, list):
args["params"] = params
else:
raise ValueError(f"Invalid params type: {type(params)}")
if "output_name" in args:
args["output_name"] = output_name
if "data_id" in args:
args["data_id"] = self._data_id
return args
def run_tool(self, tool_name: str, user_query: str, source: str, output_name: str) -> Any:
"""
Executes an tool using the Darkstar Toolserver API and an OpenAI model.
:param tool_name: The name of the tool to execute.
:param user_query: The user query to provide to the model.
:return: The result of the tool
"""
self.set_source(source)
messages = self.__create_prompt(user_query, source, output_name)
tool_args = self.get_tool_args(tool_name, messages, output_name)
result = self._client.execute_tool(tool_name, tool_args)
return result
def get_data_object(self, data_id: int) -> Dict[str, Any]:
"""
Retrieves a data object from the Darkstar Toolserver API.
:param data_id: The ID of the data object to retrieve.
:return: The data object.
"""
return self._client.call_api("GET", f"/api/v1/data/object/{data_id}")["data"]["json_blob"]
def pydantic_to_openai_tool(model: Type[BaseModel]) -> str:
"""
Convert a Pydantic model to an OpenAI tool schema.
Args:
model (Type[BaseModel]): The Pydantic model to convert.
Returns:
str: The OpenAI tool schema.
"""
schema = model_to_json_schema(model)
tool_schema = {
"type": "function",
"function": {
"name": model.__name__,
"description": model.__doc__ or "",
"parameters": schema
}
}
return json.dumps(tool_schema)
class Edge(BaseModel):
source: int = Field(..., description="The ID of the source node")
target: int = Field(..., description="The ID of the target node")
class ToolNode(BaseModel):
node_id: int = Field(..., description="The ID of the node", ge=0)
input_name: str = Field(..., description="The name of the input data")
tool_name: str = Field(..., description="The name of the tool to execute")
output_name: str = Field(..., description="The name of the output data")
class OutputType(Enum):
DATA = "data"
CHAT = "chat"
ARTIFACT = "artifact"
class FlowSchema(BaseModel):
"""A graph based representation of functions (nodes), and their data flow (edges)"""
nodes: List[ToolNode] = Field(..., description="The nodes in the flow")
edges: List[Edge] = Field([], description="The IDs of the adjacent nodes")
output_type: OutputType = Field(OutputType.CHAT, description="The type of the output")
class Config:
arbitrary_types_allowed = True
use_enum_values = True
class ToolFlow:
tools = {
"query_sql": (OutputType.DATA, True, False),
"PlotDataframe": (OutputType.ARTIFACT, False, True),
"ReadEmail": (OutputType.CHAT, True, False),
"Summarize": (OutputType.CHAT, False, True),
}
def __init__(
self,
name: str,
description: str,
prompt: str,
base_url: str = "http://localhost:8000",
model: str = "gpt-4-turbo",
model_api_key: Optional[str] = None
):
self.name = name
self.description = description
self.prompt = prompt
self.runner = ToolRunner(base_url, model, model_api_key)
self.model = model
self.openai_client = openai.Client(api_key=model_api_key)
def __create_prompt(self, user_query: str) -> List[Dict[str, str]]:
tool_list = ""
for tool, spec in self.tools.items():
tool_list += f"- Name: {tool}\n"
tool_list += f" - Output Type: {spec[0].value}\n"
tool_list += f" - Can be source node: {spec[1]}\n"
tool_list += f" - Can be sink node: {spec[2]}\n"
source_list = "\n".join(self.runner._data_sources.keys())
prompt = self.prompt.format(nodes=tool_list, sources=source_list)
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": user_query}
]
return messages
def infer_flow(self, user_query: str) -> FlowSchema:
"""
Infer the tool flow based on the user query.
Args:
user_query (str): The user's query string.
Returns:
FlowSchema: The inferred tool flow schema.
"""
messages = self.__create_prompt(user_query)
func_spec = pydantic_to_openai_tool(FlowSchema)
tool = json.loads(func_spec)
# Call the OpenAI model with the tools and messages
completion = self.openai_client.chat.completions.create(
model=self.model,
messages=messages,
tools=[tool],
tool_choice="required"
)
predicted_args = completion.choices[0].message.tool_calls[0].function.arguments
print(predicted_args)
return predicted_args
def execute_flow(self, flow_schema: Dict[str, Any], user_query: str) -> Any:
"""
Executes the tool flow based on the provided schema. This method performs a breadth-first search (BFS)
on the graph defined by the flow schema and executes each node according to the order determined by the BFS.
Args:
flow_schema (Dict[str, Any]): The schema representing the tool flow to be executed.
user_query (str): The user's query string that may influence tool execution.
Returns:
Any: The result of executing the tool flow.
"""
# Initialize a queue for BFS
execution_queue = deque([flow_schema['nodes'][0]]) # Start BFS from the source node
visited = set()
results = {}
while execution_queue:
current_node = execution_queue.popleft()
node_id = current_node['node_id']
if node_id in visited:
continue
visited.add(node_id)
# Execute the current node's operation using runner.run_tool
operation_result = self.runner.run_tool(
current_node['tool_name'],
user_query,
current_node['input_name'],
current_node['output_name']
)
results[node_id] = operation_result
# Enqueue all adjacent nodes
for edge in flow_schema.get('edges', []):
if edge['source'] == node_id:
target_node_id = edge['target']
target_node = next(node for node in flow_schema['nodes'] if node['node_id'] == target_node_id)
if target_node_id not in visited:
execution_queue.append(target_node)
# Assuming the last node processed is the sink node
sink_node = flow_schema['nodes'][-1]
sink_tool_name = sink_node['tool_name']
sink_node_id = sink_node['node_id']
sink_output_type = self.tools[sink_tool_name][0]
if sink_output_type == OutputType.DATA:
data = self.runner.get_data_object(self.runner._data_id)
else:
data = results[sink_node_id]
return (data, results, sink_output_type)
def summarize_flow_results(model_client, flow_results: Dict[str, Any], flow_schema) -> str:
"""
Summarizes the results of a tool flow execution using an OpenAI model to generate a chat response.
Args:
model_client (openai.Client): The OpenAI client to use for generating chat responses.
flow_results (Dict[str, Any]): The results of the tool flow execution.
flow_schema (Dict[str, Any]): The schema representing the tool flow.
Returns:
Dict[str, str]: A dictionary containing the chat response under the key "data".
"""
try:
# Check if flow_results is already a JSON string, otherwise convert it
if isinstance(flow_results, str):
flow_summary = flow_results
else:
flow_summary = json.dumps(flow_results, indent=2)
# Construct a concise and informative prompt for the chat model
prompt_content = dedent(f"""
Please review the tool execution results and the flow schema provided below.
Use the results of the final tool to describe the outcomes. Be concise and only use the provided information.
If the results seem incorrect or incomplete, kindly ask the user to reformulate their query for better accuracy.
The execution path, expressed a a JSON object where nodes represent tools and edges represent data flow:
{flow_schema}
The results of the execution, expressed as a JSON object:
{flow_summary}
""")
messages = [
{"role": "system", "content": prompt_content}
]
# Call the OpenAI chat model
response = model_client.chat.completions.create(
model="gpt-4-turbo",
messages=messages
)
# Extract the chat response
chat_response = response.choices[0].message.content
return chat_response
except Exception as e:
print(f"Error in summarizing flow results: {e}")
return "Error: Failed to generate summary due to an internal error."
#flow_schema = tf.infer_flow("Plot the users' age distribution")
#from pprint import pprint
#flow = json.loads(flow_schema)
#pprint(flow)
#result = tf.execute_flow(flow, "Plot the users' age distribution")
#print(result)