Copy
import requests
import time
class DeepLookupAPI:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.brightdata.com/datasets/deep_lookup/v1"
self.headers = {"Authorization": f"Bearer {api_key}"}
def research_with_spec(self, query, columns, limit=100):
# Create detailed specification
spec = {
"name": "companies",
"query": query,
"title": query.replace("Find all ", ""),
"columns": columns
}
# Trigger research
trigger_response = requests.post(
f"{self.base_url}/trigger",
headers=self.headers,
json={
"query": query,
"spec": spec,
"result_limit": limit
}
).json()
request_id = trigger_response["request_id"]
# Poll for completion
while True:
status_response = requests.get(
f"{self.base_url}/request/{request_id}/status",
headers=self.headers
).json()
print(f"Progress: {status_response.get('progress', 0)}%")
if status_response["status"] == "completed":
break
elif status_response["status"] == "failed":
raise Exception("Research failed")
time.sleep(5)
# Get results
results = requests.get(
f"{self.base_url}/request/{request_id}",
headers=self.headers
).json()
return results
def monitor_progress(self, request_id):
"""Monitor detailed progress of a research request"""
while True:
result = requests.get(
f"{self.base_url}/request/{request_id}",
headers=self.headers
).json()
step = result.get('step', 'unknown')
if step == 'identifying':
print("Analyzing your query...")
elif step == 'generating_schema':
print("Creating data structure...")
elif step == 'generating':
pages = result.get('pages_read', 0)
matched = result.get('matched_records', 0)
print(f"Processing data: {pages} pages read, {matched} matches found")
elif step == 'done':
print("Research completed!")
return result
time.sleep(3)
# Usage
api = DeepLookupAPI("YOUR_API_KEY")
columns = [
{
"name": "company_name",
"description": "Name of the company",
"type": "enrichment"
},
{
"name": "is_ai_company",
"description": "Must be an AI/ML focused company",
"type": "constraint"
},
{
"name": "employee_count",
"description": "Number of employees",
"type": "enrichment"
},
{
"name": "min_50_employees",
"description": "Must have at least 50 employees",
"type": "constraint"
}
]
results = api.research_with_spec(
"Find all AI companies in Israel with more than 50 employees",
columns,
limit=100
)
print(f"Found {results['matched_records']} companies")
print(f"Skipped {results['skipped_records']} companies (didn't match all criteria)")
print(f"Total cost: {results['total_cost']}")