Distributed Scraping¶
Scale XTools across multiple machines for high-volume data collection.
Architecture Overview¶
┌─────────────────────────────────────────────────────────────┐
│ Coordinator │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ Task Queue │ │ Results │ │ Worker Manager │ │
│ │ (Redis) │ │ Storage │ │ │ │
│ └─────────────┘ └──────────────┘ └───────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌────┴────┐ ┌────┴────┐ ┌────┴────┐
│ Worker 1│ │ Worker 2│ │ Worker 3│
│ (XTools)│ │ (XTools)│ │ (XTools)│
└─────────┘ └─────────┘ └─────────┘
Task Queue Setup¶
Use Redis for distributed task coordination:
from xtools.distributed import TaskQueue, Task
import redis.asyncio as redis
# Initialize task queue
async def setup_queue():
redis_client = await redis.from_url("redis://localhost:6379")
queue = TaskQueue(redis_client, name="xtools-tasks")
return queue
# Producer: Add tasks to queue
async def enqueue_tasks():
queue = await setup_queue()
usernames = ["user1", "user2", "user3", ...]
for username in usernames:
task = Task(
type="scrape_followers",
payload={"username": username, "limit": 1000},
priority=1
)
await queue.enqueue(task)
print(f"Enqueued {len(usernames)} tasks")
Worker Implementation¶
from xtools import XTools
from xtools.distributed import Worker, TaskQueue
class ScraperWorker(Worker):
"""Distributed scraping worker."""
def __init__(self, worker_id: str, queue: TaskQueue):
self.worker_id = worker_id
self.queue = queue
self.xtools = None
async def start(self):
"""Start processing tasks."""
self.xtools = await XTools().__aenter__()
while True:
task = await self.queue.dequeue()
if task:
await self.process_task(task)
async def process_task(self, task):
"""Process a single task."""
try:
if task.type == "scrape_followers":
result = await self.xtools.scrape.followers(
task.payload["username"],
limit=task.payload["limit"]
)
await self.queue.complete(task, result)
elif task.type == "scrape_profile":
result = await self.xtools.scrape.profile(
task.payload["username"]
)
await self.queue.complete(task, result)
except Exception as e:
await self.queue.fail(task, str(e))
async def stop(self):
if self.xtools:
await self.xtools.__aexit__(None, None, None)
# Run worker
async def run_worker(worker_id: str):
queue = await setup_queue()
worker = ScraperWorker(worker_id, queue)
await worker.start()
Worker Scaling
Run multiple workers on different machines with unique worker IDs.
Coordinator Service¶
from xtools.distributed import Coordinator
import asyncio
class ScrapingCoordinator(Coordinator):
"""Manages distributed scraping tasks."""
def __init__(self, queue: TaskQueue):
self.queue = queue
self.results = {}
async def scrape_many_profiles(self, usernames: list):
"""Distribute profile scraping across workers."""
# Enqueue all tasks
task_ids = []
for username in usernames:
task = Task(
type="scrape_profile",
payload={"username": username}
)
task_id = await self.queue.enqueue(task)
task_ids.append(task_id)
# Wait for all results
results = await self.queue.wait_for_results(task_ids)
return results
async def monitor_progress(self):
"""Monitor worker progress."""
while True:
stats = await self.queue.get_stats()
print(f"Pending: {stats['pending']}")
print(f"Processing: {stats['processing']}")
print(f"Completed: {stats['completed']}")
print(f"Failed: {stats['failed']}")
await asyncio.sleep(5)
Result Aggregation¶
from xtools.distributed import ResultStore
from xtools.storage import Database
class DistributedResultStore(ResultStore):
"""Store and aggregate results from workers."""
def __init__(self, db_url: str):
self.db = Database(db_url)
async def store(self, task_id: str, result: dict):
"""Store result from worker."""
await self.db.insert("results", {
"task_id": task_id,
"data": result,
"timestamp": datetime.now()
})
async def aggregate(self, task_type: str):
"""Aggregate all results of a task type."""
results = await self.db.query(
"SELECT * FROM results WHERE task_type = ?",
[task_type]
)
return [r["data"] for r in results]
Configuration¶
# distributed.yaml
coordinator:
host: 0.0.0.0
port: 8000
redis:
url: redis://redis:6379
workers:
count: 5
per_machine: 2
queue:
name: xtools-tasks
max_retries: 3
retry_delay: 60
storage:
type: postgresql
url: postgresql://user:pass@db:5432/xtools
from xtools.distributed import load_config, start_cluster
config = load_config("distributed.yaml")
# Start coordinator
await start_cluster(config, role="coordinator")
# Start worker (on different machine)
await start_cluster(config, role="worker")
Load Balancing¶
from xtools.distributed import LoadBalancer
balancer = LoadBalancer(
strategy="least_loaded", # or "round_robin", "random"
health_check_interval=30
)
# Register workers
balancer.register_worker("worker-1", "http://worker1:8000")
balancer.register_worker("worker-2", "http://worker2:8000")
# Distribute tasks
async def distribute_task(task):
worker = await balancer.get_worker()
await worker.submit(task)
Network Partitions
Handle network failures gracefully with retries and task reassignment.
Monitoring Dashboard¶
from xtools.distributed import Dashboard
from fastapi import FastAPI
app = FastAPI()
dashboard = Dashboard(queue, result_store)
@app.get("/stats")
async def get_stats():
return {
"workers": await dashboard.worker_stats(),
"tasks": await dashboard.task_stats(),
"throughput": await dashboard.throughput_stats()
}
@app.get("/workers")
async def list_workers():
return await dashboard.list_workers()
# Run: uvicorn dashboard:app --host 0.0.0.0 --port 8080
Fault Tolerance¶
from xtools.distributed import FaultTolerantQueue
queue = FaultTolerantQueue(
redis_url="redis://localhost:6379",
max_retries=3,
retry_backoff="exponential",
dead_letter_queue="xtools-dlq"
)
# Failed tasks go to dead letter queue after max retries
async def process_dead_letters():
dlq = await queue.get_dead_letters()
for task in dlq:
print(f"Failed task: {task.id}, Error: {task.error}")
# Manual intervention or different processing
Idempotency
Design tasks to be idempotent—safe to retry without side effects.