Takes a research topic and a JSON schema as input (e.g., “B2B enterprise CTOs from New York”, “Stripe payments company”, “history of renewable energy technologies”)
Searches the web using Bright Data SERP API (real search results, geo-targeting)
Scrapes websites using Bright Data Web Unlocker (bypasses anti-bot measures)
Uses an LLM to extract and structure the data
Validates and returns structured JSON matching your schema
Build the LangGraph workflow that orchestrates search → scrape → extract.
Report incorrect code
Copy
# --- Agent ---# System prompt instructs the LLM on its research taskSYSTEM_PROMPT = """You are a research agent. Your task is to gather information about a topic and extract structured data.You have access to these tools:- search: Search the web for information- scrape_website: Get content from a specific URL- submit_info: Call this when you have gathered all the required informationResearch topic: {topic}Required information schema:{schema}Search for relevant information, scrape important pages, then call submit_info with the extracted data."""def create_agent(): """Create the enrichment agent graph.""" llm = ChatAnthropic(model="claude-sonnet-4-20250514") async def call_model(state: AgentState) -> dict: """Call the LLM to decide next action or submit results.""" prompt = SYSTEM_PROMPT.format( topic=state.topic, schema=json.dumps(state.extraction_schema, indent=2) ) messages = [HumanMessage(content=prompt)] + list(state.messages) # Dynamic tool for structured output submission info_tool = { "name": "submit_info", "description": "Submit the extracted information when done researching.", "parameters": state.extraction_schema, } model = llm.bind_tools(tools + [info_tool]) response = await model.ainvoke(messages) # Check if agent is submitting final info info = None if hasattr(response, 'tool_calls') and response.tool_calls: for tc in response.tool_calls: if tc["name"] == "submit_info": info = tc["args"] break return {"messages": [response], "info": info} def route(state: AgentState) -> str: """Route: end if info submitted, else continue tool loop.""" if state.info: return "__end__" if not state.messages: return "agent" last_msg = state.messages[-1] if isinstance(last_msg, AIMessage) and hasattr(last_msg, 'tool_calls') and last_msg.tool_calls: for tc in last_msg.tool_calls: if tc["name"] == "submit_info": return "__end__" return "tools" return "agent" # Build the graph: agent ↔ tools loop until info is extracted graph = StateGraph(AgentState) graph.add_node("agent", call_model) graph.add_node("tools", ToolNode(tools)) graph.add_edge("__start__", "agent") graph.add_conditional_edges("agent", route) graph.add_edge("tools", "agent") return graph.compile()async def enrich(topic: str, schema: dict) -> dict: """Run the enrichment agent and return structured data.""" agent = create_agent() result = await agent.ainvoke({ "topic": topic, "extraction_schema": schema, }) return result.get("info", {})# --- Example Usage ---if __name__ == "__main__": import asyncio schema = { "type": "object", "properties": { "company_name": {"type": "string"}, "industry": {"type": "string"}, "headquarters": {"type": "string"}, "founded": {"type": "string"}, "key_products": {"type": "array", "items": {"type": "string"}}, }, "required": ["company_name", "industry"] } result = asyncio.run(enrich("Stripe payments company", schema)) print(json.dumps(result, indent=2))