🤖 Automation Workflows Cookbook¶
Production-ready automation recipes for X/Twitter. These workflows run reliably, handle errors gracefully, and respect rate limits.
The 24/7 Automation Stack¶
A complete automation system that runs continuously.
"""
The 24/7 Automation Stack
=========================
A production-ready automation system with:
- Scheduled tasks
- Error handling
- Rate limit management
- Logging and monitoring
- Graceful shutdown
"""
import asyncio
import signal
import logging
from datetime import datetime, timedelta
from typing import Callable, List
from dataclasses import dataclass, field
from xeepy import Xeepy
from xeepy.notifications import DiscordNotifier
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('automation.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('xeepy.automation')
@dataclass
class ScheduledTask:
"""A task that runs on a schedule"""
name: str
func: Callable
interval_seconds: int
last_run: datetime = None
error_count: int = 0
max_errors: int = 3
enabled: bool = True
class AutomationStack:
"""
Production automation runner with scheduling,
error handling, and monitoring.
"""
def __init__(self, notify_webhook: str = None):
self.tasks: List[ScheduledTask] = []
self.running = False
self.xeepy: Xeepy = None
self.notifier = DiscordNotifier(webhook_url=notify_webhook) if notify_webhook else None
# Handle graceful shutdown
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
def add_task(self, name: str, func: Callable, interval: str):
"""Add a scheduled task"""
# Parse interval
intervals = {
"hourly": 3600,
"daily": 86400,
"every_6h": 21600,
"every_30m": 1800,
"every_15m": 900,
}
seconds = intervals.get(interval, int(interval))
self.tasks.append(ScheduledTask(
name=name,
func=func,
interval_seconds=seconds
))
logger.info(f"Added task: {name} (every {seconds}s)")
async def run(self):
"""Main automation loop"""
self.running = True
logger.info("🚀 Automation stack starting...")
async with Xeepy() as x:
self.xeepy = x
if self.notifier:
await self.notifier.send("🟢 Xeepy automation started")
while self.running:
for task in self.tasks:
if not task.enabled:
continue
# Check if task should run
if self._should_run(task):
await self._run_task(task)
# Sleep before next check
await asyncio.sleep(60) # Check every minute
logger.info("Automation stack stopped")
def _should_run(self, task: ScheduledTask) -> bool:
"""Check if task is due to run"""
if task.last_run is None:
return True
elapsed = (datetime.now() - task.last_run).total_seconds()
return elapsed >= task.interval_seconds
async def _run_task(self, task: ScheduledTask):
"""Execute a task with error handling"""
logger.info(f"Running task: {task.name}")
try:
await task.func(self.xeepy)
task.last_run = datetime.now()
task.error_count = 0
logger.info(f"✓ Task completed: {task.name}")
except Exception as e:
task.error_count += 1
logger.error(f"✗ Task failed: {task.name} - {e}")
if self.notifier:
await self.notifier.send(
f"⚠️ Task failed: {task.name}\nError: {str(e)[:200]}"
)
# Disable task after too many errors
if task.error_count >= task.max_errors:
task.enabled = False
logger.warning(f"Task disabled due to errors: {task.name}")
if self.notifier:
await self.notifier.send(
f"🔴 Task disabled: {task.name} (too many errors)"
)
def _shutdown(self, signum, frame):
"""Handle shutdown signal"""
logger.info("Shutdown signal received...")
self.running = False
# ============================================
# AUTOMATION TASKS
# ============================================
async def task_unfollower_check(x: Xeepy):
"""Check for new unfollowers"""
report = await x.monitor.unfollowers()
if report.unfollowers:
logger.info(f"New unfollowers: {report.unfollowers}")
logger.info(f"Follower change: {report.net_change:+d}")
async def task_engagement_routine(x: Xeepy):
"""Daily engagement routine"""
# Like tweets from your network
timeline = await x.scrape.home_timeline(limit=50)
liked = 0
for tweet in timeline:
if tweet.likes > 10 and not tweet.is_liked:
await x.engage.like(tweet.url)
liked += 1
if liked >= 20: # Limit per run
break
logger.info(f"Liked {liked} tweets")
async def task_cleanup_following(x: Xeepy):
"""Unfollow non-followers"""
result = await x.unfollow.non_followers(
max_unfollows=25,
whitelist_file="whitelist.txt",
min_days_following=7
)
logger.info(f"Unfollowed {result.unfollowed_count} non-followers")
async def task_growth_tracking(x: Xeepy):
"""Track and log growth metrics"""
growth = await x.analytics.track_growth("24h")
logger.info(f"""
Growth Report (24h):
- Followers: {growth.followers_count:,}
- Net change: {growth.net_change:+d}
- New: {len(growth.new_followers)}
- Lost: {len(growth.unfollowers)}
""")
async def task_mention_monitoring(x: Xeepy):
"""Monitor and respond to mentions"""
mentions = await x.scrape.mentions("your_username", limit=20, since="1h")
for mention in mentions:
if not mention.has_my_reply:
# Auto-like mentions
await x.engage.like(mention.url)
logger.info(f"Liked mention from @{mention.author.username}")
# ============================================
# MAIN ENTRY POINT
# ============================================
async def main():
# Create automation stack
stack = AutomationStack(
notify_webhook="https://discord.com/api/webhooks/..."
)
# Add tasks
stack.add_task("unfollower_check", task_unfollower_check, "hourly")
stack.add_task("engagement", task_engagement_routine, "every_6h")
stack.add_task("cleanup", task_cleanup_following, "daily")
stack.add_task("growth_tracking", task_growth_tracking, "every_6h")
stack.add_task("mentions", task_mention_monitoring, "every_15m")
# Run forever
await stack.run()
if __name__ == "__main__":
asyncio.run(main())
Smart Engagement Bot¶
An intelligent engagement bot that adds value, not spam.
"""
Smart Engagement Bot
====================
Engages thoughtfully based on content relevance and quality.
"""
import asyncio
from xeepy import Xeepy
from xeepy.ai import ContentGenerator, SentimentAnalyzer
class SmartEngagementBot:
"""
Engagement bot that:
- Analyzes content before engaging
- Generates contextual comments
- Respects rate limits
- Avoids spam behavior
"""
def __init__(self, keywords: list, daily_limits: dict = None):
self.keywords = keywords
self.limits = daily_limits or {
"likes": 100,
"replies": 20,
"follows": 30
}
self.daily_stats = {"likes": 0, "replies": 0, "follows": 0}
self.engaged_today = set() # Track engaged tweets
async def run(self):
async with Xeepy() as x:
ai = ContentGenerator(provider="openai")
sentiment = SentimentAnalyzer(provider="openai")
while True:
# Find relevant content
for keyword in self.keywords:
tweets = await x.scrape.search(
keyword,
limit=20,
min_likes=5,
max_age_hours=6
)
for tweet in tweets:
# Skip if already engaged
if tweet.id in self.engaged_today:
continue
# Analyze tweet quality
quality = await self._analyze_quality(tweet, sentiment)
if quality["score"] > 0.7:
await self._engage_with_tweet(x, ai, tweet, quality)
# Wait before next round
await asyncio.sleep(1800) # 30 minutes
# Reset daily stats at midnight
if self._is_new_day():
self._reset_daily_stats()
async def _analyze_quality(self, tweet, sentiment):
"""Analyze if tweet is worth engaging with"""
score = 0
reasons = []
# Sentiment analysis
sent_result = await sentiment.analyze(tweet.text)
if sent_result.label == "positive":
score += 0.3
reasons.append("positive_content")
# Engagement indicators
if tweet.likes > 50:
score += 0.2
reasons.append("high_engagement")
# Author quality
if tweet.author.followers_count > 1000:
score += 0.2
reasons.append("quality_author")
if tweet.author.verified:
score += 0.1
reasons.append("verified")
# Content quality
if len(tweet.text) > 100:
score += 0.1
reasons.append("substantial_content")
if "?" in tweet.text: # Questions are engagement opportunities
score += 0.1
reasons.append("question")
return {"score": score, "reasons": reasons}
async def _engage_with_tweet(self, x, ai, tweet, quality):
"""Engage with a quality tweet"""
self.engaged_today.add(tweet.id)
# Always like quality content
if self.daily_stats["likes"] < self.limits["likes"]:
await x.engage.like(tweet.url)
self.daily_stats["likes"] += 1
# Reply if it's a question or high-engagement opportunity
if "question" in quality["reasons"] and self.daily_stats["replies"] < self.limits["replies"]:
reply = await ai.generate_reply(
tweet_text=tweet.text,
style="helpful",
add_value=True
)
# Only reply if we have something valuable to say
if len(reply) > 50:
await x.engage.reply(tweet.url, reply)
self.daily_stats["replies"] += 1
# Consider following if quality author
if "quality_author" in quality["reasons"] and self.daily_stats["follows"] < self.limits["follows"]:
if tweet.author.followers_count < 50000: # Not too big
await x.follow.user(tweet.author.username)
self.daily_stats["follows"] += 1
def _is_new_day(self):
from datetime import datetime
return datetime.now().hour == 0 and datetime.now().minute < 30
def _reset_daily_stats(self):
self.daily_stats = {"likes": 0, "replies": 0, "follows": 0}
self.engaged_today.clear()
# Run the bot
async def main():
bot = SmartEngagementBot(
keywords=["python programming", "machine learning", "startup"],
daily_limits={"likes": 100, "replies": 15, "follows": 20}
)
await bot.run()
asyncio.run(main())
Content Pipeline Automation¶
Automate your content creation and publishing workflow.
"""
Content Pipeline Automation
===========================
From ideation to publishing, fully automated.
"""
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Optional
from xeepy import Xeepy
from xeepy.ai import ContentGenerator
@dataclass
class ContentItem:
"""A piece of content in the pipeline"""
id: str
content_type: str # "tweet", "thread", "reply"
text: str
status: str = "draft" # draft, scheduled, published, failed
scheduled_time: Optional[datetime] = None
published_url: Optional[str] = None
engagement_score: Optional[float] = None
class ContentPipeline:
"""
Automated content pipeline:
1. Generate content ideas based on trends
2. Create drafts using AI
3. Schedule for optimal times
4. Publish automatically
5. Track performance
"""
def __init__(self):
self.queue: List[ContentItem] = []
self.published: List[ContentItem] = []
async def run(self):
async with Xeepy() as x:
ai = ContentGenerator(provider="openai")
while True:
# Phase 1: Content Generation (every 6 hours)
if self._should_generate():
await self._generate_content(x, ai)
# Phase 2: Publishing (check every 5 minutes)
await self._publish_scheduled(x)
# Phase 3: Performance Tracking (hourly)
if self._should_track():
await self._track_performance(x)
await asyncio.sleep(300) # 5 minutes
async def _generate_content(self, x: Xeepy, ai: ContentGenerator):
"""Generate new content based on trends and best practices"""
print("📝 Generating new content...")
# Get trending topics in your niche
trending = await x.scrape.trending_topics(category="technology")
# Analyze your best performing content
my_best = await x.analytics.top_performing_tweets(limit=20)
patterns = self._extract_patterns(my_best)
# Generate content matching your style + trends
for topic in trending[:3]:
# Generate tweet
tweet = await ai.generate_tweet(
topic=topic,
style=patterns["style"],
hooks=patterns["hooks"],
max_length=280
)
self.queue.append(ContentItem(
id=f"tweet_{datetime.now().timestamp()}",
content_type="tweet",
text=tweet
))
# Generate a thread
thread_topic = await ai.suggest_thread_topic(
based_on=my_best,
trending=trending
)
thread = await ai.generate_thread(
topic=thread_topic,
style="educational",
length=8
)
self.queue.append(ContentItem(
id=f"thread_{datetime.now().timestamp()}",
content_type="thread",
text="\n---\n".join(thread.tweets)
))
# Schedule content
await self._schedule_content(x)
print(f"✅ Generated {len(self.queue)} new items")
async def _schedule_content(self, x: Xeepy):
"""Schedule content for optimal posting times"""
best_times = await x.analytics.best_time_to_post()
unscheduled = [c for c in self.queue if c.status == "draft"]
for i, content in enumerate(unscheduled):
# Find next available slot
slot = best_times.get_next_slot(
after=datetime.now() + timedelta(hours=i * 4)
)
content.scheduled_time = slot
content.status = "scheduled"
print(f" Scheduled: {content.id} for {slot}")
async def _publish_scheduled(self, x: Xeepy):
"""Publish content that's due"""
now = datetime.now()
for content in self.queue:
if content.status == "scheduled" and content.scheduled_time <= now:
try:
if content.content_type == "tweet":
result = await x.post.tweet(content.text)
elif content.content_type == "thread":
tweets = content.text.split("\n---\n")
result = await x.post.thread(tweets)
content.status = "published"
content.published_url = result.url
self.published.append(content)
print(f"✅ Published: {content.id}")
except Exception as e:
content.status = "failed"
print(f"❌ Failed to publish {content.id}: {e}")
# Remove published from queue
self.queue = [c for c in self.queue if c.status not in ["published", "failed"]]
async def _track_performance(self, x: Xeepy):
"""Track performance of published content"""
for content in self.published[-10:]: # Last 10 items
if content.published_url:
stats = await x.scrape.tweet(content.published_url)
content.engagement_score = (
stats.likes + stats.retweets * 2 + stats.replies * 3
)
def _extract_patterns(self, tweets):
"""Extract patterns from top performing tweets"""
# Simplified pattern extraction
return {
"style": "educational",
"hooks": ["How to", "Here's why", "The truth about"],
"avg_length": sum(len(t.text) for t in tweets) // len(tweets)
}
def _should_generate(self):
# Generate if queue is low
return len([c for c in self.queue if c.status == "scheduled"]) < 5
def _should_track(self):
return datetime.now().minute == 0 # Top of each hour
# Run the pipeline
asyncio.run(ContentPipeline().run())
Monitoring & Alerting System¶
Real-time monitoring with intelligent alerts.
"""
Monitoring & Alerting System
============================
Real-time monitoring with smart notifications.
"""
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Callable
from enum import Enum
from xeepy import Xeepy
from xeepy.notifications import NotificationManager
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
level: AlertLevel
title: str
message: str
timestamp: datetime = None
def __post_init__(self):
self.timestamp = self.timestamp or datetime.now()
class MonitoringSystem:
"""
Comprehensive monitoring system:
- Real-time follower tracking
- Engagement anomaly detection
- Rate limit monitoring
- System health checks
"""
def __init__(self, config: dict):
self.config = config
self.alerts: List[Alert] = []
self.metrics_history = []
self.notifications = NotificationManager()
async def run(self):
async with Xeepy() as x:
while True:
# Collect metrics
metrics = await self._collect_metrics(x)
self.metrics_history.append(metrics)
# Check thresholds
await self._check_follower_alerts(metrics)
await self._check_engagement_alerts(metrics)
await self._check_rate_limits(x)
await self._check_system_health(x)
# Process alerts
await self._process_alerts()
# Cleanup old metrics
self._cleanup_history()
await asyncio.sleep(300) # 5 minutes
async def _collect_metrics(self, x: Xeepy):
"""Collect all relevant metrics"""
profile = await x.scrape.profile(self.config["username"])
engagement = await x.analytics.engagement_analysis("1h")
return {
"timestamp": datetime.now(),
"followers": profile.followers_count,
"following": profile.following_count,
"engagement_rate": engagement.rate,
"recent_likes": engagement.total_likes,
"recent_replies": engagement.total_replies
}
async def _check_follower_alerts(self, metrics):
"""Check for unusual follower changes"""
if len(self.metrics_history) < 2:
return
prev = self.metrics_history[-2]
change = metrics["followers"] - prev["followers"]
# Sudden drop alert
if change < -self.config["thresholds"]["follower_drop"]:
self.alerts.append(Alert(
level=AlertLevel.WARNING,
title="Sudden Follower Drop",
message=f"Lost {abs(change)} followers in last check period"
))
# Unusual gain (could be bot attack)
if change > self.config["thresholds"]["follower_spike"]:
self.alerts.append(Alert(
level=AlertLevel.INFO,
title="Follower Spike Detected",
message=f"Gained {change} followers in last check period"
))
async def _check_engagement_alerts(self, metrics):
"""Detect engagement anomalies"""
if len(self.metrics_history) < 12: # Need 1 hour of data
return
# Calculate average engagement
recent = self.metrics_history[-12:]
avg_rate = sum(m["engagement_rate"] for m in recent) / len(recent)
# Alert if engagement dropped significantly
if metrics["engagement_rate"] < avg_rate * 0.5:
self.alerts.append(Alert(
level=AlertLevel.WARNING,
title="Engagement Drop",
message=f"Engagement rate dropped to {metrics['engagement_rate']:.2%} (avg: {avg_rate:.2%})"
))
async def _check_rate_limits(self, x: Xeepy):
"""Monitor rate limit usage"""
limits = await x.get_rate_limit_status()
for endpoint, status in limits.items():
usage_pct = status["used"] / status["limit"]
if usage_pct > 0.9:
self.alerts.append(Alert(
level=AlertLevel.WARNING,
title="Rate Limit Warning",
message=f"{endpoint}: {usage_pct:.0%} of limit used"
))
async def _check_system_health(self, x: Xeepy):
"""Check system health"""
try:
# Test basic functionality
await x.auth.is_authenticated()
except Exception as e:
self.alerts.append(Alert(
level=AlertLevel.CRITICAL,
title="System Health Check Failed",
message=str(e)
))
async def _process_alerts(self):
"""Send alert notifications"""
for alert in self.alerts:
if alert.level == AlertLevel.CRITICAL:
await self.notifications.broadcast(
f"🚨 {alert.title}\n{alert.message}"
)
elif alert.level == AlertLevel.WARNING:
await self.notifications.send(
f"⚠️ {alert.title}\n{alert.message}",
channels=["discord"]
)
else:
# Just log info alerts
print(f"ℹ️ {alert.title}: {alert.message}")
self.alerts.clear()
def _cleanup_history(self):
"""Keep only last 24 hours of metrics"""
cutoff = datetime.now() - timedelta(hours=24)
self.metrics_history = [
m for m in self.metrics_history
if m["timestamp"] > cutoff
]
# Configuration
config = {
"username": "your_username",
"thresholds": {
"follower_drop": 50, # Alert if lose 50+ followers
"follower_spike": 500, # Alert if gain 500+ followers
}
}
# Run monitoring
asyncio.run(MonitoringSystem(config).run())
Docker Deployment¶
Deploy your automation with Docker.
# Dockerfile
FROM mcr.microsoft.com/playwright/python:v1.40.0-jammy
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy automation code
COPY . .
# Run automation
CMD ["python", "automation_stack.py"]
# docker-compose.yml
version: '3.8'
services:
xeepy-automation:
build: .
restart: unless-stopped
environment:
- DISCORD_WEBHOOK=${DISCORD_WEBHOOK}
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./data:/app/data # Persist data
- ./session.json:/app/session.json # Auth session
- ./whitelist.txt:/app/whitelist.txt
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
Next Steps¶
-
Analyze your automation data
-
Turn automation into business value