From 6272a426f14fb0bff13005b87d07976633921f99 Mon Sep 17 00:00:00 2001 From: Sam Partee Date: Wed, 8 May 2024 21:25:23 -0700 Subject: [PATCH] 4 simple flows working --- examples/gmail/pack.lock.toml | 2 + examples/gmail/pack.toml | 2 + examples/gmail/tools/chat.py | 12 +- examples/gmail/tools/gmailer.py | 46 +-- examples/gmail/tools/read_sqlite.py | 43 +++ examples/gmail/tools/search.py | 65 ++++ examples/sql-chat/agent.py | 340 ++++++++++--------- examples/sql-chat/main.py | 50 +-- toolserve/toolserve/builtin/default/data.py | 107 ++++++ toolserve/toolserve/builtin/default/query.py | 12 +- toolserve/toolserve/server/core/catalog.py | 9 +- toolserve/toolserve/server/core/conf.py | 8 + 12 files changed, 484 insertions(+), 212 deletions(-) create mode 100644 examples/gmail/tools/read_sqlite.py create mode 100644 examples/gmail/tools/search.py create mode 100644 toolserve/toolserve/builtin/default/data.py diff --git a/examples/gmail/pack.lock.toml b/examples/gmail/pack.lock.toml index ae60afcb..302aece9 100644 --- a/examples/gmail/pack.lock.toml +++ b/examples/gmail/pack.lock.toml @@ -11,4 +11,6 @@ email = "sam@partee.io" SendEmail = "gmailer.send_email@0.1.0" ReadEmail = "gmailer.read_email@0.1.0" PlotDataframe = "gmailer.plot_dataframe@0.1.0" +ReadSqlite = "read_sqlite.read_sqlite@0.1.0" Summarize = "chat.summarize@0.1.0" +VectorSearch = "search.vector_search@0.1.0" diff --git a/examples/gmail/pack.toml b/examples/gmail/pack.toml index c5f9f76f..283aa2bc 100644 --- a/examples/gmail/pack.toml +++ b/examples/gmail/pack.toml @@ -10,3 +10,5 @@ email = "sam@partee.io" [modules] gmailer = "0.1.0" chat = "0.1.0" +search = "0.1.0" +read_sqlite = "0.1.0" diff --git a/examples/gmail/tools/chat.py b/examples/gmail/tools/chat.py index 5687ed37..124bc1de 100644 --- a/examples/gmail/tools/chat.py +++ b/examples/gmail/tools/chat.py @@ -7,8 +7,8 @@ import openai @tool async def summarize( - #text: Param(str, "Text to summarize"), - data_id: Param(int, "ID of the data to summarize"), + text: Param(str, "Text to summarize"), + #data_id: Param(int, "ID of the data to summarize"), system_prompt: Param(str, "System prompt to use") = "Summarize the following text", max_tokens: Param(int, "Maximum number of tokens to generate") = 1000, ) -> Param(str, "Summarized text"): @@ -21,11 +21,15 @@ async def summarize( Returns: str: The summarized text. """ - df = await get_df(data_id) - text = df.to_json(orient='records') + #df = await get_df(data_id) + #text = df.to_json(orient='records') api_key = get_secret("openai_api_key", None) model = get_secret("openai_model_summarize", "gpt-4-turbo") # Call the OpenAI model with the tools and messages + + if isinstance(text, list): + text = "\n".join(text) + messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": text}, diff --git a/examples/gmail/tools/gmailer.py b/examples/gmail/tools/gmailer.py index 337a8cae..6f8885fa 100644 --- a/examples/gmail/tools/gmailer.py +++ b/examples/gmail/tools/gmailer.py @@ -48,9 +48,9 @@ async def send_email( @tool async def read_email( - output_name: Param(str, "Name of the output data"), + #output_name: Param(str, "Name of the output data"), n_emails: Param(int, "Number of emails to read") = 5, - ): + ) -> Param(str, "emails"): """Read emails from a Gmail account and extract plain text content, removing any HTML.""" email_address = get_secret("gmail_email") @@ -70,31 +70,37 @@ async def read_email( emails = [] for email_id in email_ids[:n_emails]: - result, data = mail.fetch(email_id, "(RFC822)") - raw_email = data[0][1] - msg = email.message_from_bytes(raw_email) + try: + result, data = mail.fetch(email_id, "(RFC822)") + raw_email = data[0][1] + msg = email.message_from_bytes(raw_email) - email_details = { - "from": msg["From"], - "to": msg["To"], - "date": msg["Date"] - } + email_details = { + "from": msg["From"], + "to": msg["To"], + "date": msg["Date"] + } - if msg.is_multipart(): - for part in msg.walk(): - if part.get_content_type() == "text/plain": - body = part.get_payload(decode=True).decode('utf-8') - email_details["body"] = clean_email_body(body) - else: - body = msg.get_payload(decode=True).decode('utf-8') - email_details["body"] = clean_email_body(body) + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + body = part.get_payload(decode=True).decode('utf-8') + email_details["body"] = clean_email_body(body) + else: + body = msg.get_payload(decode=True).decode('utf-8') + email_details["body"] = clean_email_body(body) + except Exception as e: + print(f"Error reading email {email_id}: {e}") + continue emails.append(email_details) mail.close() mail.logout() - df = pd.DataFrame(emails) - await save_df(df, output_name) + #df = pd.DataFrame(emails) + #await save_df(df, output_name) + data = "\n".join([f"{email['from']} - {email['date']}\n{email['body']}\n" for email in emails]) + return data diff --git a/examples/gmail/tools/read_sqlite.py b/examples/gmail/tools/read_sqlite.py new file mode 100644 index 00000000..1fdde656 --- /dev/null +++ b/examples/gmail/tools/read_sqlite.py @@ -0,0 +1,43 @@ + +from toolserve.sdk import Param, tool, get_secret +from toolserve.sdk.dataframe import save_df +import pandas as pd + +from sqlite3 import connect + +@tool +async def read_sqlite( + file_path: Param(str, "Path to the SQLite database file"), + table_name: Param(str, "Name of the table to read from"), + output_name: Param(str, "Name of the output data to save"), + ) -> Param(str, "Output data name"): + """Read data from a SQLite database table and save it as a DataFrame. + + Args: + file_path (str): Path to the SQLite database file. + table_name (str): Name of the table to read from. + output_name (str): Name of the output data to save. + + Returns: + str: Name of the output data. + """ + # Connect to the SQLite database + conn = connect(file_path) + cursor = conn.cursor() + + # Read the data from the table + query = f"SELECT * FROM {table_name}" + cursor.execute(query) + rows = cursor.fetchall() + + # Get the column names + cursor.execute(f"PRAGMA table_info({table_name})") + columns = [col[1] for col in cursor.fetchall()] + + # Create a DataFrame from the data + df = pd.DataFrame(rows, columns=columns) + + # Save the DataFrame + await save_df(df, output_name) + + return output_name \ No newline at end of file diff --git a/examples/gmail/tools/search.py b/examples/gmail/tools/search.py new file mode 100644 index 00000000..fff55493 --- /dev/null +++ b/examples/gmail/tools/search.py @@ -0,0 +1,65 @@ + + + +import faiss +import numpy as np + +from typing import List +from fastembed import TextEmbedding +from toolserve.sdk import Param, tool, get_secret +from toolserve.sdk.dataframe import get_df + + +@tool +async def vector_search( + data_id: Param(int, "The ID of the data source containing the documents"), + query: Param(str, "The text to find within the documents"), + column_name: Param(str, "The name of the column containing the documents"), + n_results: Param(int, "The number of top results to return") = 5 +) -> Param(List[str], "The documents most similar to the query"): + """Create a FAISS index from a list of documents and search for the query, returning the most similar documents. + + Args: + query (str): The text query to search for. Should be written like a document. + column_name (str): The name of the column containing the documents. + n_results (int, optional): The number of top results to return. Defaults to 5. + + Returns: + List[str]: The documents most similar to the query based on the search. + """ + # Get the data + df = await get_df(data_id) + docs = df[column_name].tolist() + + # Initialize the embedding model + embedding_tool = TextEmbedding() + + # Embed all documents + embeddings = [] + for doc in docs: + # Get the generator from the embed method + doc_embedding_generator = embedding_tool.embed([doc]) + # Convert the generator to a list and take the first element + doc_embedding = list(doc_embedding_generator)[0] + embeddings.append(doc_embedding) + + # Convert list of embeddings to a numpy array and ensure type float32 + embeddings = np.vstack(embeddings).astype('float32') + + # Create a flat L2 index + dimension = embeddings.shape[1] + index = faiss.IndexFlatL2(dimension) + index.add(embeddings) # Add embeddings to the index + + # Embed the query + query_embedding_generator = embedding_tool.embed([query]) + query_embedding = list(query_embedding_generator)[0] + query_embedding = np.array(query_embedding, dtype='float32').reshape(1, -1) + + # Search the index + distances, indices = index.search(query_embedding, n_results) + + # Fetch the documents corresponding to the top indices + top_docs = [docs[i] for i in indices.flatten().tolist()] + + return top_docs \ No newline at end of file diff --git a/examples/sql-chat/agent.py b/examples/sql-chat/agent.py index 87304823..f1e371d4 100644 --- a/examples/sql-chat/agent.py +++ b/examples/sql-chat/agent.py @@ -2,7 +2,7 @@ import httpx import json import time import openai -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple, Union from pydantic import BaseModel from typing import List, Dict @@ -17,30 +17,77 @@ from typing import Dict, Any, Optional import json from collections import deque +def pydantic_to_openai_tool(model: Type[BaseModel]) -> str: + """ + Convert a Pydantic model to an OpenAI tool schema. + + Args: + model (Type[BaseModel]): The Pydantic model to convert. + + Returns: + str: The OpenAI tool schema. + """ + schema = model_to_json_schema(model) + tool_schema = { + "type": "function", + "function": { + "name": model.__name__, + "description": model.__doc__ or "", + "parameters": schema + } + } + return json.dumps(tool_schema) + +class Edge(BaseModel): + source: int = Field(..., description="The ID of the source node") + target: int = Field(..., description="The ID of the target node") + +class ToolNode(BaseModel): + node_id: int = Field(..., description="The ID of the node", ge=0) + input_name: Optional[str] = Field(None, description="The name of the input data") + tool_name: str = Field(..., description="The name of the tool to execute") + output_name: Optional[str] = Field(None, description="The name of the output data") + predict_args: bool = Field(True, description="Whether to predict the arguments for the tool") + from_node: Optional[Dict[str, int]] = Field(None, description="The ID of the source node name of the argument to pass to the tool") + args: Optional[Dict[str, Any]] = Field(None, description="The arguments to pass to the tool") + + +class OutputType(Enum): + DATA = "data" + CHAT = "chat" + ARTIFACT = "artifact" + +class FlowSchema(BaseModel): + """A graph based representation of functions (nodes), and their data flow (edges)""" + + nodes: List[ToolNode] = Field(..., description="The nodes in the flow") + edges: List[Edge] = Field([], description="The IDs of the adjacent nodes") + output_type: OutputType = Field(OutputType.CHAT, description="The type of the output") + + class Config: + arbitrary_types_allowed = True + use_enum_values = True class ToolClient: - available_tools = { - "query_sql": "/tool/query/query_sql", - "list_data_sources": "/tool/query/list_data_sources", - "get_data_schema": "/tool/query/get_data_schema", - "PlotDataframe": "/tool/gmailer/PlotDataframe", - "ReadEmail": "/tool/gmailer/ReadEmail", - "Summarize": "/tool/chat/Summarize", - } - def __init__(self, base_url: str): self.base_url = base_url - self.client = httpx.Client(timeout=30) - self.tools = self.__collect_tool_specs() + self.client = httpx.Client(timeout=3000) + tools, routes = self.__collect_tool_specs() + self.tools = tools + self.available_tools = routes + def __collect_tool_specs(self) -> Dict[str, str]: + tools_list = self.call_api("GET", "/api/v1/tools/list").get("data", {}) + all_tools = [tool["name"] for tool in tools_list] + routes = {tool["name"]: tool["endpoint"] for tool in tools_list} tools = {} - for tool_name, endpoint in self.available_tools.items(): + for tool_name, endpoint in routes.items(): openai_spec = self.call_api("GET", "/api/v1/tools/oai_function", params={"tool_name": tool_name}).get("data", {}) tools[tool_name] = openai_spec - return tools + return tools, routes def call_api(self, method: str, endpoint: str, params: dict = {}, data: dict = {}, json_data: dict = {}) -> Dict[str, Any]: """Call the Darkstar Toolserver API with the given parameters. @@ -173,7 +220,6 @@ class ToolRunner: raise ValueError(f"Tool '{tool_name}' not found in available tools.") tool = json.loads(func_spec) - print(tool) # Call the OpenAI model with the tools and messages completion = self._openai_client.chat.completions.create( model="gpt-4-turbo", @@ -198,26 +244,37 @@ class ToolRunner: if "output_name" in args and output_name != "None": args["output_name"] = output_name - if "data_id" in args: - args["data_id"] = self._data_id return args - def run_tool(self, tool_name: str, user_query: str, source: str, output_name: str) -> Any: + def run_tool(self, tool: ToolNode, user_query: str, **kwargs) -> Any: """ Executes an tool using the Darkstar Toolserver API and an OpenAI model. - - :param tool_name: The name of the tool to execute. - :param user_query: The user query to provide to the model. - :return: The result of the tool """ + source = None + if tool.input_name: + source = tool.input_name self.set_source(source) - print(f"Tool Name: {tool_name}") - print(f"Data ID: {self._data_id}") - print(f"Sourcing data from {source}") - messages = self.__create_prompt(user_query, source, output_name) - tool_args = self.get_tool_args(tool_name, messages, output_name) - result = self._client.execute_tool(tool_name, tool_args) + + if tool.predict_args: + messages = self.__create_prompt(user_query, source, tool.output_name) + tool_args = self.get_tool_args(tool.tool_name, messages, tool.output_name) + elif tool.from_node: + # todo change to list + tool_args = kwargs.get("tool_args", {}) + else: + tool_args = {} + + # TODO would something ever have an input_name and not need a data_id? + if tool.input_name: + tool_args["data_id"] = self._data_id + + if tool.args: + tool_args.update(tool.args) + + + print("Calling tool with args:", tool_args) + result = self._client.execute_tool(tool.tool_name, tool_args) return result def get_data_object(self, data_id: int) -> Dict[str, Any]: @@ -230,63 +287,10 @@ class ToolRunner: return self._client.call_api("GET", f"/api/v1/data/object/{data_id}")["data"]["json_blob"] -def pydantic_to_openai_tool(model: Type[BaseModel]) -> str: - """ - Convert a Pydantic model to an OpenAI tool schema. - Args: - model (Type[BaseModel]): The Pydantic model to convert. - - Returns: - str: The OpenAI tool schema. - """ - schema = model_to_json_schema(model) - tool_schema = { - "type": "function", - "function": { - "name": model.__name__, - "description": model.__doc__ or "", - "parameters": schema - } - } - return json.dumps(tool_schema) - -class Edge(BaseModel): - source: int = Field(..., description="The ID of the source node") - target: int = Field(..., description="The ID of the target node") - -class ToolNode(BaseModel): - node_id: int = Field(..., description="The ID of the node", ge=0) - input_name: Optional[str] = Field(None, description="The name of the input data") - tool_name: str = Field(..., description="The name of the tool to execute") - output_name: Optional[str] = Field(..., description="The name of the output data") - -class OutputType(Enum): - DATA = "data" - CHAT = "chat" - ARTIFACT = "artifact" - -class FlowSchema(BaseModel): - """A graph based representation of functions (nodes), and their data flow (edges)""" - - nodes: List[ToolNode] = Field(..., description="The nodes in the flow") - edges: List[Edge] = Field([], description="The IDs of the adjacent nodes") - output_type: OutputType = Field(OutputType.CHAT, description="The type of the output") - - class Config: - arbitrary_types_allowed = True - use_enum_values = True class ToolFlow: - tools = { - "query_sql": (OutputType.DATA, True, False), - "PlotDataframe": (OutputType.ARTIFACT, False, True), - "ReadEmail": (OutputType.CHAT, True, False), - "Summarize": (OutputType.CHAT, False, True), - - } - def __init__( self, name: str, @@ -304,24 +308,6 @@ class ToolFlow: self.openai_client = openai.Client(api_key=model_api_key) - def __create_prompt(self, user_query: str) -> List[Dict[str, str]]: - tool_list = "" - for tool, spec in self.tools.items(): - tool_list += f"- Name: {tool}\n" - tool_list += f" - Output Type: {spec[0].value}\n" - tool_list += f" - Can be source node: {spec[1]}\n" - tool_list += f" - Can be sink node: {spec[2]}\n" - - source_list = "\n".join(self.runner._data_sources.keys()) - - prompt = self.prompt.format(nodes=tool_list, sources=source_list) - - messages = [ - {"role": "system", "content": prompt}, - {"role": "user", "content": user_query} - ] - return messages - def infer_flow(self, user_query: str) -> FlowSchema: """ Infer the tool flow based on the user query. @@ -364,10 +350,17 @@ class ToolFlow: # Initialize a queue for BFS - execution_queue = deque([flow_schema['nodes'][0]]) # Start BFS from the source node + # Queue up all nodes which don't have incoming edges + incoming_edges = {node['node_id']: 0 for node in flow_schema['nodes']} + for edge in flow_schema.get('edges', []): + incoming_edges[edge['target']] += 1 + execution_queue = deque([node for node in flow_schema['nodes'] if incoming_edges[node['node_id']] == 0]) + visited = set() results = {} + timings = {} + flow_start_time = time.time() while execution_queue: current_node = execution_queue.popleft() node_id = current_node['node_id'] @@ -376,14 +369,22 @@ class ToolFlow: continue visited.add(node_id) + exec_start_time = time.time() # Execute the current node's operation using runner.run_tool - operation_result = self.runner.run_tool( - current_node['tool_name'], - user_query, - current_node['input_name'], - current_node['output_name'] - ) + current_tool = ToolNode(**current_node) + if current_tool.from_node: + tool_args = {} + for arg_name, from_node_id in current_tool.from_node.items(): + from_node_result = results[from_node_id]["data"]["result"] + tool_args[arg_name] = from_node_result + + operation_result = self.runner.run_tool(current_tool, user_query, tool_args=tool_args) + else: + operation_result = self.runner.run_tool(current_tool, user_query) + results[node_id] = operation_result + exec_end_time = time.time() + timings[current_tool.tool_name] = exec_end_time - exec_start_time # Enqueue all adjacent nodes for edge in flow_schema.get('edges', []): @@ -397,7 +398,13 @@ class ToolFlow: sink_node = flow_schema['nodes'][-1] sink_tool_name = sink_node['tool_name'] sink_node_id = sink_node['node_id'] - sink_output_type = self.tools[sink_tool_name][0] + # TODO: Tools need to specify output type + #sink_output_type = self.tools[sink_tool_name][0] + sink_output_type = OutputType(flow_schema['output_type']) + + flow_end_time = time.time() + timings['total'] = flow_end_time - flow_start_time + if sink_output_type == OutputType.DATA: data = self.runner.get_data_object(self.runner._data_id) elif sink_output_type == OutputType.CHAT: @@ -405,59 +412,7 @@ class ToolFlow: else: data = results[sink_node_id] - return (data, results, sink_output_type) - - -def summarize_flow_results(model_client, flow_results: Dict[str, Any], flow_schema) -> str: - """ - Summarizes the results of a tool flow execution using an OpenAI model to generate a chat response. - - Args: - model_client (openai.Client): The OpenAI client to use for generating chat responses. - flow_results (Dict[str, Any]): The results of the tool flow execution. - flow_schema (Dict[str, Any]): The schema representing the tool flow. - - - Returns: - Dict[str, str]: A dictionary containing the chat response under the key "data". - """ - try: - # Check if flow_results is already a JSON string, otherwise convert it - if isinstance(flow_results, str): - flow_summary = flow_results - else: - flow_summary = json.dumps(flow_results, indent=2) - - # Construct a concise and informative prompt for the chat model - prompt_content = dedent(f""" - Please review the tool execution results and the flow schema provided below. - Use the results of the final tool to describe the outcomes. Be concise and only use the provided information. - If the results seem incorrect or incomplete, kindly ask the user to reformulate their query for better accuracy. - - The execution path, expressed a a JSON object where nodes represent tools and edges represent data flow: - {flow_schema} - - The results of the execution, expressed as a JSON object: - {flow_summary} - - """) - - messages = [ - {"role": "system", "content": prompt_content} - ] - - # Call the OpenAI chat model - response = model_client.chat.completions.create( - model="gpt-4-turbo", - messages=messages - ) - - # Extract the chat response - chat_response = response.choices[0].message.content - return chat_response - except Exception as e: - print(f"Error in summarizing flow results: {e}") - return "Error: Failed to generate summary due to an internal error." + return (data, results, sink_output_type, timings) @@ -474,7 +429,7 @@ plotting_flow = FlowSchema( ) -email_flow = FlowSchema( +email_flow_1 = FlowSchema( nodes=[ ToolNode(node_id=0, input_name=None, tool_name="ReadEmail", output_name="email_data_1"), ToolNode(node_id=1, input_name="email_data_1", tool_name="Summarize", output_name=None), @@ -485,11 +440,68 @@ email_flow = FlowSchema( output_type=OutputType.CHAT ) +email_flow = FlowSchema( + nodes=[ + ToolNode(node_id=0, tool_name="ReadEmail"), + ToolNode(node_id=1, tool_name="Summarize", from_node={"text": 2}, predict_args=False), + ], + edges=[ + Edge(source=0, target=1) + ], + output_type=OutputType.CHAT +) -class Agent: +review_db = "/Users/spartee/Dropbox/Arcade/platform/toolserver/examples/data/food-reviews/database.sqlite" +review_flow = FlowSchema( + nodes=[ + ToolNode(node_id=0, tool_name="ReadSqlite", args={"table_name": "Reviews", "file_path": review_db, "output_name": "reviews"}, predict_args=False), + ToolNode(node_id=1, input_name="reviews", tool_name="query_sql", output_name="review_data"), + ToolNode(node_id=2, input_name="review_data", tool_name="search_text_columns"), + ToolNode(node_id=3, tool_name="Summarize", from_node={"text": 2}, predict_args=False), + ], + edges=[ + Edge(source=0, target=1), + Edge(source=1, target=2), + Edge(source=2, target=3) + ], + output_type=OutputType.CHAT +) - def __init__(self, flows: Dict[str, FlowSchema]): - self.flows = flows + +shopify_db = "/Users/spartee/Dropbox/Arcade/platform/toolserver/examples/data/olist.sqlite" +customer_flow = FlowSchema( + nodes=[ + ToolNode(node_id=0, tool_name="ReadSqlite", args={"table_name": "customers", "file_path": shopify_db, "output_name": "customers"}, predict_args=False), + ToolNode(node_id=1, tool_name="ReadSqlite", args={"table_name": "orders", "file_path": shopify_db, "output_name": "all_customer_orders"}, predict_args=False), + ToolNode(node_id=2, input_name="customers", tool_name="query_sql", output_name="customer_data"), + ToolNode(node_id=3, input_name="all_customer_orders", tool_name="query_sql", output_name="customer_orders"), + ToolNode(node_id=4, input_name="customer_data", tool_name="get"), + ToolNode(node_id=5, input_name="customer_orders", tool_name="get"), + ToolNode(node_id=6, tool_name="combine_results", from_node={"result_1": 4, "result_2": 5}, predict_args=False), + ToolNode(node_id=7, tool_name="Summarize", from_node={"text": 6}, predict_args=False) + ], + edges=[ + Edge(source=0, target=2), + Edge(source=1, target=3), + Edge(source=2, target=4), + Edge(source=3, target=5), + Edge(source=4, target=6), + Edge(source=5, target=6), + Edge(source=6, target=7) + ], + output_type=OutputType.CHAT +) + + + +def print_flow_as_yaml(data: Dict[str, Any]): + + data_dict = data.dict(exclude_unset=True) if isinstance(data, BaseModel) else data + # Convert the dictionary to a YAML formatted string + yaml_str = yaml.dump(data_dict, sort_keys=False) + + # Print the YAML string + print(yaml_str) diff --git a/examples/sql-chat/main.py b/examples/sql-chat/main.py index 4bd05f9b..5b2dbcf9 100644 --- a/examples/sql-chat/main.py +++ b/examples/sql-chat/main.py @@ -21,7 +21,7 @@ from streamlit_chat import message import streamlit.components.v1 as components from textwrap import dedent import plotly.express as px -from agent import ToolFlow, email_flow, plotting_flow +from agent import ToolFlow, email_flow, plotting_flow, review_flow, customer_flow @@ -67,8 +67,9 @@ def plot_flow(data: Dict[str, Any]): # Check if there are any nodes to determine a start node for bfs_layout if G.nodes: - start_node = next(iter(G.nodes)) # Get an arbitrary start node - pos = nx.bfs_layout(G, start_node) + #start_node = next(iter(G.nodes)) # Get an arbitrary start node + #pos = nx.bfs_layout(G, start_node) + pos = nx.spring_layout(G) else: pos = {} @@ -94,7 +95,7 @@ def get_agent(): # From here down is all the StreamLit UI. st.set_page_config(page_title="Arcade AI Demo", page_icon=":robot:", layout="wide") -dropdown_options = ["Gmailer", "PlotBot"] +dropdown_options = ["Gmailer", "PlotBot", "ReviewChat", "CustomerService"] selected_option = st.sidebar.selectbox("Select an App:", dropdown_options) st.sidebar.write(f"Selected App: {selected_option}") @@ -132,6 +133,10 @@ def submit(): json_flow = email_flow.dict() elif selected_option == "PlotBot": json_flow = plotting_flow.dict() + elif selected_option == "ReviewChat": + json_flow = review_flow.dict() + elif selected_option == "CustomerService": + json_flow = customer_flow.dict() else: st.error("Invalid option selected") return @@ -160,20 +165,27 @@ if st.session_state["generated"]: message(st.session_state["past"][i], is_user=True, key=str(i) + "_user") result = st.session_state["generated"][i] - res, all_results, output_type = result + result_tab, all_results_tab, times_tab = st.tabs(["Result", "All Results", "Execution Times"]) + res, all_results, output_type, timings = result + + with all_results_tab: + st.write(all_results) + with times_tab: + st.write(timings) + with result_tab: + output_type = output_type.value + if output_type == "artifact": + # plot the json returned in res + fig_json = res["data"]["result"] + # plot the json with ploylu atream lit + st.plotly_chart(json.loads(fig_json)) + elif output_type == "chat": + st.write(res) + elif output_type == "data": + json_res = json.loads(res)["data"] + st.dataframe(json_res) + else: + st.error("Returned result:") + st.error(res) - output_type = output_type.value - if output_type == "artifact": - # plot the json returned in res - fig_json = res["data"]["result"] - # plot the json with ploylu atream lit - st.plotly_chart(json.loads(fig_json)) - elif output_type == "chat": - st.write(res) - elif output_type == "data": - json_res = json.loads(res)["data"] - st.dataframe(json_res) - else: - st.error("Returned result:") - st.error(res) diff --git a/toolserve/toolserve/builtin/default/data.py b/toolserve/toolserve/builtin/default/data.py new file mode 100644 index 00000000..66ce32bc --- /dev/null +++ b/toolserve/toolserve/builtin/default/data.py @@ -0,0 +1,107 @@ + +from typing import List, Dict, Any + +from toolserve.sdk.dataframe import get_df, save_df +from toolserve.sdk.tool import tool, Param + +@tool +async def get( + data_id: Param(int, "ID of the data") + ) -> Param(str, "data"): + """Get data by ID""" + df = await get_df(data_id) + return df.to_json(orient='records') + +@tool +async def select_columns( + data_id: Param(int, "ID of the data"), + columns: Param(List[str], "Columns to select") + ) -> Param(str, "data"): + """Select columns from a DataFrame""" + df = await get_df(data_id) + df = df[columns] + return df.to_json(orient='records') + +@tool +async def filter_rows( + data_id: Param(int, "ID of the data"), + column: Param(str, "Column to filter"), + value: Param(str, "Value to filter by") + ) -> Param(str, "data"): + """Filter rows in a DataFrame""" + df = await get_df(data_id) + df = df[df[column] == value] + return df.to_json(orient='records') + +@tool +async def sort( + data_id: Param(int, "ID of the data"), + column: Param(str, "Column to sort by"), + ascending: Param(bool, "Sort ascending or descending") = True + ) -> Param(str, "data"): + """Sort a DataFrame by a column""" + df = await get_df(data_id) + df = df.sort_values(by=column, ascending=ascending) + return df.to_json(orient='records') + +@tool +async def group_by( + data_id: Param(int, "ID of the data"), + columns: Param(List[str], "Columns to group by"), + aggregations: Param(Dict[str, str], "Aggregations to perform") + ) -> Param(str, "data"): + """Group by columns and perform aggregations""" + df = await get_df(data_id) + df = df.groupby(columns).agg(aggregations) + return df.to_json(orient='records') + +@tool +async def join( + data_id1: Param(int, "ID of the first data"), + data_id2: Param(int, "ID of the second data"), + on: Param(str, "Column to join on"), + how: Param(str, "Type of join") = "inner" + ) -> Param(str, "data"): + """Join two DataFrames""" + df1 = await get_df(data_id1) + df2 = await get_df(data_id2) + df = df1.merge(df2, on=on, how=how) + return df.to_json(orient='records') + +@tool +async def search_text_columns( + data_id: Param(int, "ID of the data"), + query: Param(str, "Text to search for"), + column: Param(str, "Column to search in"), + max_rows: Param(int, "Maximum number of rows to return") = 50 + ) -> Param(str, "data"): + """Search text in columns + + Search for a text query in a specific column of a DataFrame. + + Args: + data_id (int): The ID of the data source to search in. + query (str): The text to search for. + column (str): The column to search in. + + Returns: + str: The data source after filtering for the text query, limited to a maximum number of rows. + """ + df = await get_df(data_id) + # Ensure the column data is treated as string + df[column] = df[column].astype(str) + # Use regex=False to treat the query as a literal string, avoiding any regex special character issues + mask = df[column].str.contains(query, case=False, na=False, regex=False) + df = df[mask] + # Limit the number of rows returned + df = df.head(max_rows) + return df.to_json(orient='records') + + +@tool +def combine_results( + result_1: Param(str, "First result"), + result_2: Param(str, "Second result") + ) -> Param(str, "data"): + """Combine two results""" + return str(result_1) + str(result_2) diff --git a/toolserve/toolserve/builtin/default/query.py b/toolserve/toolserve/builtin/default/query.py index 6872f38f..2a47ee06 100644 --- a/toolserve/toolserve/builtin/default/query.py +++ b/toolserve/toolserve/builtin/default/query.py @@ -53,10 +53,13 @@ async def query_sql( """Query a data source using SQL The SQL query should be parameterized with DuckDB's syntax. For example, to query a - DataFrame named `df` with a parameter `param`, the query should be `SELECT * FROM df WHERE column = ?`. + DataFrame named `df` with a parameter `param`, the query should be `SELECT * FROM df WHERE ? = ?`. The list of params should be in order of the parameters in the SQL query. + IMPORTANT: There should be no parameters in the query. + For example: `SELECT * FROM df WHERE name = ?` should be `SELECT * FROM df WHERE ? = ?`. + After the query, a new data source at a new id will be created with the results and the schema of the data source will be returned. @@ -71,7 +74,6 @@ async def query_sql( """ try: # Retrieve the DataFrame and execute the SQL query using DuckDB - import duckdb df = await get_df(data_id) con = duckdb.connect(database=':memory:', read_only=False) con.register('df_table', df) @@ -88,10 +90,12 @@ async def query_sql( except Exception as e: # Log the error and raise an exception - await log(f"Failed to execute query: {str(e)}", level="ERROR") + log_message = f"Failed to execute query: {str(e)}." + log_message += f" -- SQL: {sql}" + log_message += f" -- Parameters: {params}" + await log(log_message, level="ERROR") raise RuntimeError(f"Query execution failed: {str(e)}") - def get_df_info(df: pd.DataFrame, data_id: Optional[int]=None) -> Dict[str, Union[int, str]]: """ Generate a compact string representation of a DataFrame including the count of columns, diff --git a/toolserve/toolserve/server/core/catalog.py b/toolserve/toolserve/server/core/catalog.py index 710c24ec..3bc507ca 100644 --- a/toolserve/toolserve/server/core/catalog.py +++ b/toolserve/toolserve/server/core/catalog.py @@ -120,7 +120,14 @@ class ToolCatalog: return None def list_tools(self) -> List[Dict[str, str]]: - return [{'name': t.name, 'description': t.description} for t in self.tools.values()] + def get_tool_endpoint(t: ToolSchema) -> str: + return f"/tool/{t.meta.module}/{t.name}" + return [ + {'name': t.name, + 'description': t.description, + 'endpoint': get_tool_endpoint(t) + } for t in self.tools.values()] + diff --git a/toolserve/toolserve/server/core/conf.py b/toolserve/toolserve/server/core/conf.py index ba12b0ec..9c686669 100644 --- a/toolserve/toolserve/server/core/conf.py +++ b/toolserve/toolserve/server/core/conf.py @@ -24,6 +24,14 @@ class Settings(BaseSettings): "query.list_data_sources@builtin", "query.get_data_schema@builtin", "query.query_sql@builtin", + "data.get@builtin", + "data.select_columns@builtin", + "data.filter_rows@builtin", + "data.sort@builtin", + "data.group_by@builtin", + "data.join@builtin", + "data.search_text_columns@builtin", + "data.combine_results@builtin", ] # Env Config