Hi all, I am trying the latest MCP built-in server in OS 3.3.1 and particularily the below code. It fails giving an error “httpx.ResponseNotRead: Attempted to access streaming response content, without having called `read()`.”
I highly appreciate if you provide me with a full langchain example that works with OpenSearch MCP Streamable HTTP (with OpenSearch security enabled).
# Use server from examples/servers/streamable-http-stateless/
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
import httpx
from langchain_mcp_adapters.client import MultiServerMCPClient
import base64
username='admin'
password='SomePass'
credentials = f"{username}:{password}".encode("utf-8")
auth_header = f"Basic {base64.b64encode(credentials).decode('utf-8')}"
headers = {
"Authorization": auth_header,
"Content-Type": "application/json"
}
# Define a custom httpx client factory function
def custom_httpx_client_factory(headers=None, timeout=None, auth=None) -> httpx.AsyncClient:
print('headers',headers)
"""
Returns a custom httpx.AsyncClient instance with specific configurations.
"""
client = httpx.AsyncClient(
verify=False
# You can add other httpx.AsyncClient configurations here, e.g., proxies
)
return client
client = MultiServerMCPClient({
"opensearch": {
"url": "https://localhost:9200/_plugins/_ml/mcp",
"transport": "streamable_http",
"headers": {
"Content-Type": "application/json",
"Authorization": auth_header
},
"httpx_client_factory":custom_httpx_client_factory
}
}
)
tools = await client.get_tools()
agent = create_react_agent(llm, tools)
math_response = await agent.ainvoke({"messages": "what's (3 + 5) x 12?"})
This has been resolved with the following code.
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langchain_anthropic import ChatAnthropic # or other LLM provider
from langchain_openai import ChatOpenAI
import sys
from langgraph.constants import END
MODEL_NAME = "granite4:350m" # Replace with the model you want to test (e.g., "llama3", "gemma:2b")
llm = ChatOpenAI(
model=MODEL_NAME,
base_url="http://localhost:11434/v1",
api_key="",
temperature=0.6
)
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
import base64
import httpx
username='admin'
password='mypassword'
credentials = f"{username}:{password}".encode("utf-8")
auth_header = f"Basic {base64.b64encode(credentials).decode('utf-8')}"
# Define a custom httpx client factory function
def custom_httpx_client_factory(headers=None, timeout=None, auth=None) -> httpx.AsyncClient:
"""
Returns a custom httpx.AsyncClient instance with specific configurations.
"""
client = httpx.AsyncClient(
verify=False
# You can add other httpx.AsyncClient configurations here, e.g., proxies
)
return client
client = MultiServerMCPClient({
"opensearch": {
"url": "https://localhost:9200/_plugins/_ml/mcp",
"transport": "streamable_http",
"headers": {
"Authorization": auth_header
}
,"httpx_client_factory":custom_httpx_client_factory
}
}
)
tools = await client.get_tools()
# 3. Create the ReAct agent
agent = create_react_agent(
model=llm,
tools=tools,
prompt="You are a helpful assistant."
)
user_query_1 = {"messages": [{"role": "user", "content": "list all indices"}]}
print("--- Agent Execution Log (astream_events) ---")
print(f"User Query: {user_query_1['messages'][0]['content']}\n")
# Use astream_events for detailed, structured streaming
async for event in agent.astream_events(user_query_1, version="v1"):
# 1. Handle LLM Streaming (Thinking and Final Answer)
if event["event"] == "on_chat_model_stream":
# The data contains chunks from the LLM.
# Check if the chunk is part of the final assistant message
content = event["data"]["chunk"].content
if content:
# Print intermediate LLM thoughts or the final response chunk
print(content, end="", flush=True)
# 2. Handle Tool Calls
elif event["event"] == "on_tool_start":
# Print a clear indicator when a tool is called
print(f"\n\n⚙️ Calling Tool: {event['name']} with input: {event['data'].get('input')}")
# 3. Handle Tool Results
elif event["event"] == "on_tool_end":
# Print the result received from the tool
print(f"\n✅ Tool Result Received. Output: {event['data'].get('output')}")
# 4. Handle End of Agent Run
elif event["event"] == "on_chain_end":
if event["name"] == END: # LangGraph's special END node
# The final full response is often available here, but we prefer
# the stream from 'on_chat_model_stream' for word-by-word printing.
# We just print a clear line to mark the completion.
print("\n\n--- Agent Execution Complete ---")
# Print a final newline for clean terminal output
print()