unused graphs

This commit is contained in:
LUIS NOVO 2024-11-01 19:08:33 -03:00
parent 4f4abf7098
commit edf839cd1b
5 changed files with 11 additions and 268 deletions

View file

@ -1,47 +0,0 @@
from langchain_core.runnables import (
RunnableConfig,
)
from langgraph.graph import END, START, StateGraph
from typing_extensions import TypedDict
from open_notebook.config import load_default_models
from open_notebook.domain.notebook import Note, Notebook, Source
from open_notebook.graphs.utils import run_pattern
DEFAULT_MODELS, EMBEDDING_MODEL, SPEECH_TO_TEXT_MODEL = load_default_models()
class DocQueryState(TypedDict):
doc_id: str
doc_content: str
question: str
answer: str
notebook: Notebook
def call_model(state: dict, config: RunnableConfig) -> dict:
model_id = config.get("configurable", {}).get(
"model_id", DEFAULT_MODELS.default_transformation_model
)
return {"answer": run_pattern("doc_query", model_id, state)}
# todo: there is probably a better way to do this and avoid repetition
def get_content(state: DocQueryState) -> dict:
doc_id = state["doc_id"]
if "note:" in doc_id:
doc: Note = Note.get(id=doc_id)
elif "source:" in doc_id:
doc: Source = Source.get(id=doc_id)
doc_content = doc.get_context("long") if doc else None
return {"doc_content": doc_content}
agent_state = StateGraph(DocQueryState)
agent_state.add_node("get_content", get_content)
agent_state.add_node("agent", call_model)
agent_state.add_edge(START, "get_content")
agent_state.add_edge("get_content", "agent")
agent_state.add_edge("agent", END)
graph = agent_state.compile()

View file

@ -1,36 +0,0 @@
from langchain_core.runnables import (
RunnableConfig,
)
from langgraph.graph import END, START, StateGraph
from typing_extensions import TypedDict
from open_notebook.config import load_default_models
from open_notebook.graphs.utils import run_pattern
DEFAULT_MODELS, EMBEDDING_MODEL, SPEECH_TO_TEXT_MODEL = load_default_models()
class PatternState(TypedDict):
input_text: str
pattern: str
output: str
def call_model(state: dict, config: RunnableConfig) -> dict:
model_id = config.get("configurable", {}).get(
"model_id", DEFAULT_MODELS.default_transformation_model
)
return {
"output": run_pattern(
pattern_name=state["pattern"],
model_id=model_id,
state=state,
)
}
agent_state = StateGraph(PatternState)
agent_state.add_node("agent", call_model)
agent_state.add_edge(START, "agent")
agent_state.add_edge("agent", END)
graph = agent_state.compile()

View file

@ -1,81 +0,0 @@
import os
from typing import List, Literal
from langchain_core.runnables import (
RunnableConfig,
)
from langgraph.graph import END, START, StateGraph
from typing_extensions import TypedDict
from open_notebook.config import load_default_models
from open_notebook.graphs.utils import run_pattern
from open_notebook.utils import split_text
DEFAULT_MODELS, EMBEDDING_MODEL, SPEECH_TO_TEXT_MODEL = load_default_models()
class TocState(TypedDict):
chunks: List[str]
content: str
toc: str
def build_chunks(state: TocState) -> dict:
"""
Split the input text into chunks.
"""
return {
"chunks": split_text(
state["content"],
chunk=int(os.environ.get("SUMMARY_CHUNK_SIZE", 200000)),
overlap=int(os.environ.get("SUMMARY_CHUNK_OVERLAP", 1000)),
)
}
def setup_next_chunk(state: TocState) -> dict:
"""
Move the next item in the chunk to the processing area
"""
state["content"] = state["chunks"].pop(0)
return {"chunks": state["chunks"], "content": state["content"]}
def chunk_condition(state: TocState) -> Literal["get_chunk", END]: # type: ignore
"""
Checks whether there are more chunks to process.
"""
if len(state["chunks"]) > 0:
return "get_chunk"
return END
def call_model(state: TocState, config: RunnableConfig) -> dict:
model_id = config.get("configurable", {}).get(
"model_id", DEFAULT_MODELS.default_transformation_model
)
return {
"toc": run_pattern(
pattern_name="recursive_toc",
model_id=model_id,
state=state,
).content
}
agent_state = StateGraph(TocState)
agent_state.add_node("setup_chunk", build_chunks)
agent_state.add_edge(START, "setup_chunk")
agent_state.add_conditional_edges(
"setup_chunk",
chunk_condition,
)
agent_state.add_node("get_chunk", setup_next_chunk)
agent_state.add_node("agent", call_model)
agent_state.add_edge("get_chunk", "agent")
agent_state.add_conditional_edges(
"agent",
chunk_condition,
)
graph = agent_state.compile()

View file

@ -1,93 +0,0 @@
import os
from typing import List, Literal
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.runnables import (
RunnableConfig,
)
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel
from typing_extensions import TypedDict
from open_notebook.config import load_default_models
from open_notebook.graphs.utils import run_pattern
from open_notebook.utils import split_text
DEFAULT_MODELS, EMBEDDING_MODEL, SPEECH_TO_TEXT_MODEL = load_default_models()
class SummaryResponse(BaseModel):
"""This is schema of your response. Please provide a JSON object with the enclosed keys"""
summary: str
topics: List[str]
title: str
class SummaryState(TypedDict):
chunks: List[str]
content: str
output: SummaryResponse
def build_chunks(state: SummaryState) -> dict:
"""
Split the input text into chunks.
"""
return {
"chunks": split_text(
state["content"],
chunk=int(os.environ.get("SUMMARY_CHUNK_SIZE", 200000)),
overlap=int(os.environ.get("SUMMARY_CHUNK_OVERLAP", 1000)),
)
}
def setup_next_chunk(state: SummaryState) -> dict:
"""
Move the next item in the chunk to the processing area
"""
state["content"] = state["chunks"].pop(0)
return {"chunks": state["chunks"], "content": state["content"]}
def chunk_condition(state: SummaryState) -> Literal["get_chunk", END]: # type: ignore
"""
Checks whether there are more chunks to process.
"""
if len(state["chunks"]) > 0:
return "get_chunk"
return END
def call_model(state: dict, config: RunnableConfig) -> dict:
model_id = config.get("configurable", {}).get(
"model_id", DEFAULT_MODELS.default_transformation_model
)
parser = PydanticOutputParser(pydantic_object=SummaryResponse)
return {
"output": run_pattern(
pattern_name="summarize",
model_id=model_id,
state=state,
parser=parser,
)
}
agent_state = StateGraph(SummaryState)
agent_state.add_node("setup_chunk", build_chunks)
agent_state.add_edge(START, "setup_chunk")
agent_state.add_conditional_edges(
"setup_chunk",
chunk_condition,
)
agent_state.add_node("get_chunk", setup_next_chunk)
agent_state.add_node("agent", call_model)
agent_state.add_edge("get_chunk", "agent")
agent_state.add_conditional_edges(
"agent",
chunk_condition,
)
graph = agent_state.compile()

View file

@ -12,15 +12,15 @@ def get_current_timestamp() -> str:
return datetime.now().strftime("%Y%m%d%H%M%S")
@tool
def doc_query(doc_id: str, question: str):
"""
name: doc_query
Use this tool if you need to investigate into a particular document.
Another LLM will read the document and answer the question that you might have.
Use this when the user question cannot be answered with the content you have in context.
"""
from open_notebook.graphs.doc_query import graph
# @tool
# def doc_query(doc_id: str, question: str):
# """
# name: doc_query
# Use this tool if you need to investigate into a particular document.
# Another LLM will read the document and answer the question that you might have.
# Use this when the user question cannot be answered with the content you have in context.
# """
# from temp.doc_query import graph
result = graph.invoke({"doc_id": doc_id, "question": question})
return result["answer"]
# result = graph.invoke({"doc_id": doc_id, "question": question})
# return result["answer"]