Hello World to LangGraph
LangGraph is module built on top of LangChain to better enable creation of cyclical graphs, often needed for agent runtimes.
Default LangGraph example uses Tavily search. I want to change it to use Bing search and Azure OpenAI.
Create Bing search resource in Azure
Create resource “Bing Search v7” (there is another “Bing custom search” to customize search by specifying on-topic sites and images, but don’t use it for now).
Python Packages needed
langchain
langchain-openai
langgraph
langchainhub
Hello World program
Create custom tool similar to Tavily search
Set environment variables for Bing search
os.environ["BING_SUBSCRIPTION_KEY"] = ""
os.environ["BING_SEARCH_URL"] = "https://api.bing.microsoft.com/v7.0/search"
BingSearchResults (key customization)
from typing import Dict, List, Optional, Type, Union
from langchain_core.callbacks import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import BaseTool
from langchain_community.utilities import BingSearchAPIWrapper
class BingSearchInput(BaseModel):
"""Input for the Bing search tool."""
query: str = Field(description="search query to look up")
class BingSearchResults(BaseTool):
"""Tool that queries the Bing Search API and gets back json."""
name: str = "bing_search_results_json"
description: str = (
"A search engine optimized for comprehensive, accurate, and trusted results. "
"Useful for when you need to answer questions about current events. "
"Input should be a search query."
)
api_wrapper: BingSearchAPIWrapper = Field(default_factory=BingSearchAPIWrapper)
max_results: int = 5
args_schema: Type[BaseModel] = BingSearchInput
# synchronous
def _run(
self,
query: str,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> Union[List[Dict], str]:
"""Use the tool."""
try:
return self.api_wrapper.results(
query,
self.max_results,
)
except Exception as e:
return repr(e)
# asynchronous
async def _arun(
self,
query: str,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> Union[List[Dict], str]:
"""Use the tool asynchronously."""
try:
return await self.api_wrapper.results_async(
query,
self.max_results,
)
except Exception as e:
return repr(e)
For rest steps, follow LangGraph base agent executor example.
Create LangChain agent
The prompt for openai functions agent is just “You are a helpful assistant”.
from langchain import hub
from langchain.agents import create_openai_functions_agent
from langchain_openai import AzureChatOpenAI
tools = [BingSearchResults(max_results=1)]
# Get the prompt to use - you can modify this!
prompt = hub.pull("hwchase17/openai-functions-agent")
# Choose the LLM that will drive the agent
llm = AzureChatOpenAI(azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"), temperature=0, streaming=True)
# Construct the OpenAI Functions agent
agent_runnable = create_openai_functions_agent(llm, tools, prompt)
Define graph state
In LangGraph, the graph is workflow with nodes, states and transition between nodes. According to base agent executor example, the state for the traditional LangChain agent has a few attributes:
input
: This is the input string representing the main ask from the user, passed in as input.chat_history
: This is any previous conversation messages, also passed in as input.intermediate_steps
: This is list of actions and corresponding observations that the agent takes over time. This is updated each iteration of the agent.agent_outcome
: This is the response from the agent, either an AgentAction or AgentFinish. The AgentExecutor should finish when this is an AgentFinish, otherwise it should call the requested tools.
from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
# The input string
input: str
# The list of previous messages in the conversation
chat_history: list[BaseMessage]
# The outcome of a given call to the agent
# Needs `None` as a valid type, since this is what this will start as
agent_outcome: Union[AgentAction, AgentFinish, None]
# List of actions and corresponding observations
# Here we annotate this with `operator.add` to indicate that operations to
# this state should be ADDED to the existing values (not overwrite it)
intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]
Define nodes
from langchain_core.agents import AgentFinish
from langgraph.prebuilt.tool_executor import ToolExecutor
# This a helper class we have that is useful for running tools
# It takes in an agent action and calls that tool and returns the result
tool_executor = ToolExecutor(tools)
# Define the agent
def run_agent(data):
agent_outcome = agent_runnable.invoke(data)
return {"agent_outcome": agent_outcome}
# Define the function to execute tools
def execute_tools(data):
# Get the most recent agent_outcome - this is the key added in the `agent` above
agent_action = data["agent_outcome"]
output = tool_executor.invoke(agent_action)
return {"intermediate_steps": [(agent_action, str(output))]}
# Define logic that will be used to determine which conditional edge to go down
def should_continue(data):
# If the agent outcome is an AgentFinish, then we return `exit` string
# This will be used when setting up the graph to define the flow
if isinstance(data["agent_outcome"], AgentFinish):
return "end"
# Otherwise, an AgentAction is returned
# Here we return `continue` string
# This will be used when setting up the graph to define the flow
else:
return "continue"
Define graph
from langgraph.graph import END, StateGraph
# Define a new graph
workflow = StateGraph(AgentState)
# Define the two nodes we will cycle between
workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)
# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.set_entry_point("agent")
# We now add a conditional edge
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
# Finally we pass in a mapping.
# The keys are strings, and the values are other nodes.
# END is a special node marking that the graph should finish.
# What will happen is we will call `should_continue`, and then the output of that
# will be matched against the keys in this mapping.
# Based on which one it matches, that node will then be called.
{
# If `tools`, then we call the tool node.
"continue": "action",
# Otherwise we finish.
"end": END,
},
)
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()
Run graph
I am using Bing news search API.
inputs = {"input": "what is the news in sf", "chat_history": []}
for s in app.stream(inputs):
print(s)
print("----")
Result
{'agent': {'agent_outcome': AgentActionMessageLog(tool='bing_search_results_json', tool_input={'query': 'news in San Francisco'}, log="\nInvoking: `bing_search_results_json` with `{'query': 'news in San Francisco'}`\n\n\n", message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{\n "query": "news in San Francisco"\n}', 'name': 'bing_search_results_json'}})])}}
----
{'action': {'intermediate_steps': [(AgentActionMessageLog(tool='bing_search_results_json', tool_input={'query': 'news in San Francisco'}, log="\nInvoking: `bing_search_results_json` with `{'query': 'news in San Francisco'}`\n\n\n", message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{\n "query": "news in San Francisco"\n}', 'name': 'bing_search_results_json'}})]), "[{'snippet': 'ABC7 covers local <b>San Francisco</b> <b>news</b>, weather, sports and traffic so you stay up-to-date. Stay informed on your region with breaking <b>news</b> and streaming video.', 'title': 'San Francisco News | ABC7 KGO - ABC7 San Francisco', 'link': 'https://abc7news.com/san-francisco/'}]")]}}
----
{'agent': {'agent_outcome': AgentFinish(return_values={'output': 'The latest news in San Francisco can be found on ABC7 KGO. They cover local news, weather, sports, and traffic to keep you up-to-date. You can find breaking news and streaming video on their website: [ABC7 San Francisco](https://abc7news.com/san-francisco/)'}, log='The latest news in San Francisco can be found on ABC7 KGO. They cover local news, weather, sports, and traffic to keep you up-to-date. You can find breaking news and streaming video on their website: [ABC7 San Francisco](https://abc7news.com/san-francisco/)')}}
----
{'__end__': {'input': 'what is the news in sf', 'chat_history': [], 'agent_outcome': AgentFinish(return_values={'output': 'The latest news in San Francisco can be found on ABC7 KGO. They cover local news, weather, sports, and traffic to keep you up-to-date. You can find breaking news and streaming video on their website: [ABC7 San Francisco](https://abc7news.com/san-francisco/)'}, log='The latest news in San Francisco can be found on ABC7 KGO. They cover local news, weather, sports, and traffic to keep you up-to-date. You can find breaking news and streaming video on their website: [ABC7 San Francisco](https://abc7news.com/san-francisco/)'), 'intermediate_steps': [(AgentActionMessageLog(tool='bing_search_results_json', tool_input={'query': 'news in San Francisco'}, log="\nInvoking: `bing_search_results_json` with `{'query': 'news in San Francisco'}`\n\n\n", message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{\n "query": "news in San Francisco"\n}', 'name': 'bing_search_results_json'}})]), "[{'snippet': 'ABC7 covers local <b>San Francisco</b> <b>news</b>, weather, sports and traffic so you stay up-to-date. Stay informed on your region with breaking <b>news</b> and streaming video.', 'title': 'San Francisco News | ABC7 KGO - ABC7 San Francisco', 'link': 'https://abc7news.com/san-francisco/'}]")]}}
----
The output shows verbose details of the internal steps of the workflow.
- Program enters entry node “agent” with input “what is the news in sf”.
- Agent node (run_agent function) is using OpenAI function calling and bing search tool to convert question text to function calling: {‘function_call’: {‘arguments’: ‘{\n “query”: “news in San Francisco”\n}’, ‘name’: ‘bing_search_results_json’}}
- At this time, agent node returns AgentActionMessageLog, not AgentFinish. Due to conditional edge definition, it goes to action node, which maps to execute_tools function.
- The previous node ‘agent_outcome’ key value is extracted, then invoke tool_executor. Since we only have one tool, it will invoke that tool.
- Due to normal edge definition, the tool node then moves to agent node. tool node output ([{‘snippet’: ‘ABC7 covers local <b>San Francisco</b> <b>news</b>, weather, sports and traffic so you stay up-to-date. Stay informed on your region with breaking <b>news</b> and streaming video.’, ‘title’: ‘San Francisco News | ABC7 KGO — ABC7 San Francisco’, ‘link’: ‘https://abc7news.com/san-francisco/'}]) is passed
- The agent converts input to text and return AgentFinish, causing conditiional edge to END.