Skip to content

Data-Driven Hashtag Optimization

Build a comprehensive hashtag research and optimization system using performance data and machine learning.


Overview

This recipe creates a hashtag optimization system with:

  • Performance scraping - Analyze hashtag metrics
  • Reach vs competition - Find optimal hashtags
  • Niche discovery - Uncover hidden gems
  • Trending analysis - Time hashtag usage
  • A/B testing - Validate strategies
  • Weekly reports - Track progress

System Architecture

┌─────────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Hashtag        │────▶│  Performance │────▶│  Opportunity    │
│  Scraper        │     │  Analyzer    │     │  Scorer         │
└─────────────────┘     └──────────────┘     └─────────────────┘
        │                       │                     │
        ▼                       ▼                     ▼
┌─────────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Trending       │     │  A/B Test    │     │  Strategy       │
│  Monitor        │     │  Framework   │     │  Generator      │
└─────────────────┘     └──────────────┘     └─────────────────┘

Data Models

# hashtag_models.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class HashtagMetrics:
    tag: str
    total_tweets: int
    tweets_per_hour: float
    avg_likes: float
    avg_retweets: float
    avg_replies: float
    engagement_rate: float
    top_accounts_ratio: float  # % from top 10 accounts
    unique_authors: int
    peak_hours: list[int]
    scraped_at: datetime = field(default_factory=datetime.now)

@dataclass
class HashtagScore:
    tag: str
    reach_score: float      # 0-100
    competition_score: float  # 0-100 (lower = less competition)
    relevance_score: float   # 0-100
    opportunity_score: float  # Combined score
    recommendation: str      # use_always, use_sometimes, avoid
    notes: list[str] = field(default_factory=list)

@dataclass
class HashtagStrategy:
    primary_tags: list[str]    # Always use (3-5)
    secondary_tags: list[str]  # Rotate (5-10)
    trending_slots: int        # Reserved for trending
    niche_tags: list[str]      # Low competition gems
    avoid_tags: list[str]      # Overused/spammy

Hashtag Scraper

# hashtag_scraper.py
import asyncio
from datetime import datetime, timedelta
from collections import Counter

from xeepy import Xeepy
from hashtag_models import HashtagMetrics

class HashtagScraper:
    """Scrape and analyze hashtag performance."""

    def __init__(self):
        self.cache: dict[str, HashtagMetrics] = {}

    async def analyze_hashtag(
        self,
        tag: str,
        sample_size: int = 200
    ) -> HashtagMetrics:
        """Analyze performance metrics for a hashtag."""

        # Remove # if present
        tag = tag.lstrip('#')

        async with Xeepy() as x:
            # Scrape recent tweets with hashtag
            tweets = await x.scrape.hashtag(
                f"#{tag}",
                limit=sample_size
            )

            if not tweets:
                return self._empty_metrics(tag)

            # Calculate metrics
            total = len(tweets)

            # Time span
            oldest = min(t.created_at for t in tweets)
            newest = max(t.created_at for t in tweets)
            hours_span = max(1, (newest - oldest).total_seconds() / 3600)
            tweets_per_hour = total / hours_span

            # Engagement metrics
            likes = [t.like_count for t in tweets]
            retweets = [t.retweet_count for t in tweets]
            replies = [t.reply_count for t in tweets]

            avg_likes = sum(likes) / total
            avg_retweets = sum(retweets) / total
            avg_replies = sum(replies) / total

            # Author analysis
            authors = [t.author.username for t in tweets]
            unique_authors = len(set(authors))

            # Top accounts concentration
            author_counts = Counter(authors)
            top_10_tweets = sum(c for _, c in author_counts.most_common(10))
            top_accounts_ratio = top_10_tweets / total

            # Calculate engagement rate
            total_followers = sum(t.author.followers_count for t in tweets)
            total_engagement = sum(likes) + sum(retweets) + sum(replies)
            engagement_rate = total_engagement / max(total_followers, 1)

            # Peak hours
            hour_counts = Counter(t.created_at.hour for t in tweets)
            peak_hours = [h for h, _ in hour_counts.most_common(3)]

            metrics = HashtagMetrics(
                tag=tag,
                total_tweets=total,
                tweets_per_hour=tweets_per_hour,
                avg_likes=avg_likes,
                avg_retweets=avg_retweets,
                avg_replies=avg_replies,
                engagement_rate=engagement_rate,
                top_accounts_ratio=top_accounts_ratio,
                unique_authors=unique_authors,
                peak_hours=peak_hours
            )

            self.cache[tag] = metrics
            return metrics

    async def analyze_multiple(
        self,
        tags: list[str],
        sample_size: int = 100
    ) -> list[HashtagMetrics]:
        """Analyze multiple hashtags."""

        results = []
        for tag in tags:
            metrics = await self.analyze_hashtag(tag, sample_size)
            results.append(metrics)
            await asyncio.sleep(2)  # Rate limiting

        return results

    def _empty_metrics(self, tag: str) -> HashtagMetrics:
        return HashtagMetrics(
            tag=tag,
            total_tweets=0,
            tweets_per_hour=0,
            avg_likes=0,
            avg_retweets=0,
            avg_replies=0,
            engagement_rate=0,
            top_accounts_ratio=0,
            unique_authors=0,
            peak_hours=[]
        )

Opportunity Scorer

# opportunity_scorer.py
import math
from hashtag_models import HashtagMetrics, HashtagScore

class OpportunityScorer:
    """Score hashtags for opportunity potential."""

    def __init__(
        self,
        target_tweets_per_hour: float = 50,  # Ideal activity level
        target_engagement_rate: float = 0.02,
        min_unique_authors: int = 20
    ):
        self.target_tph = target_tweets_per_hour
        self.target_engagement = target_engagement_rate
        self.min_authors = min_unique_authors

    def score(
        self,
        metrics: HashtagMetrics,
        relevance: float = 1.0  # 0-1, how relevant to your niche
    ) -> HashtagScore:
        """Calculate opportunity score for hashtag."""

        # Reach score (0-100)
        # Based on activity level - too low = no reach, too high = noise
        if metrics.tweets_per_hour < 1:
            reach = 10  # Very low activity
        elif metrics.tweets_per_hour > self.target_tph * 10:
            reach = 30  # Very high activity (saturated)
        else:
            # Optimal is around target
            ratio = metrics.tweets_per_hour / self.target_tph
            reach = 100 - abs(math.log10(max(0.1, ratio))) * 30
        reach = max(0, min(100, reach))

        # Competition score (0-100, lower competition = higher score)
        # Based on unique authors and top account concentration
        if metrics.unique_authors < self.min_authors:
            competition = 20  # Too few authors = dominated by few
        else:
            author_diversity = min(1, metrics.unique_authors / 100)
            concentration_penalty = metrics.top_accounts_ratio * 50
            competition = author_diversity * 100 - concentration_penalty
        competition = max(0, min(100, competition))

        # Relevance score
        relevance_score = relevance * 100

        # Engagement bonus
        engagement_bonus = min(20, metrics.engagement_rate / self.target_engagement * 10)

        # Combined opportunity score
        opportunity = (
            reach * 0.30 +
            competition * 0.30 +
            relevance_score * 0.30 +
            engagement_bonus
        )

        # Generate recommendation
        notes = []
        if opportunity >= 70:
            recommendation = "use_always"
            notes.append("Excellent opportunity hashtag")
        elif opportunity >= 50:
            recommendation = "use_sometimes"
            notes.append("Good for rotation")
        else:
            recommendation = "avoid"
            if competition < 40:
                notes.append("Too competitive/saturated")
            if reach < 40:
                notes.append("Low visibility potential")

        # Add specific notes
        if metrics.tweets_per_hour > 500:
            notes.append("Very high volume - tweets get buried quickly")
        if metrics.top_accounts_ratio > 0.5:
            notes.append("Dominated by few accounts")
        if metrics.engagement_rate > 0.05:
            notes.append("High engagement hashtag!")

        return HashtagScore(
            tag=metrics.tag,
            reach_score=round(reach, 1),
            competition_score=round(competition, 1),
            relevance_score=round(relevance_score, 1),
            opportunity_score=round(opportunity, 1),
            recommendation=recommendation,
            notes=notes
        )

Niche Hashtag Discoverer

# niche_discoverer.py
import re
from collections import Counter

from xeepy import Xeepy

class NicheDiscoverer:
    """Discover niche hashtags from successful accounts."""

    async def discover_from_accounts(
        self,
        successful_accounts: list[str],
        tweets_per_account: int = 50,
        min_occurrences: int = 3
    ) -> list[tuple[str, int]]:
        """Discover hashtags used by successful accounts."""

        all_hashtags = []

        async with Xeepy() as x:
            for username in successful_accounts:
                tweets = await x.scrape.tweets(username, limit=tweets_per_account)

                for tweet in tweets:
                    # Extract hashtags
                    tags = re.findall(r'#(\w+)', tweet.text.lower())
                    all_hashtags.extend(tags)

        # Count occurrences
        counter = Counter(all_hashtags)

        # Filter by minimum occurrences
        common_tags = [
            (tag, count) for tag, count in counter.most_common(100)
            if count >= min_occurrences
        ]

        return common_tags

    async def discover_related(
        self,
        seed_hashtag: str,
        depth: int = 2
    ) -> list[str]:
        """Discover related hashtags from a seed."""

        discovered = set()
        to_process = [seed_hashtag.lstrip('#')]

        async with Xeepy() as x:
            for _ in range(depth):
                next_level = []

                for tag in to_process:
                    if tag in discovered:
                        continue

                    discovered.add(tag)

                    # Scrape tweets with this hashtag
                    tweets = await x.scrape.hashtag(f"#{tag}", limit=50)

                    # Extract co-occurring hashtags
                    for tweet in tweets:
                        co_tags = re.findall(r'#(\w+)', tweet.text.lower())
                        for co_tag in co_tags:
                            if co_tag not in discovered:
                                next_level.append(co_tag)

                to_process = list(set(next_level))[:20]  # Limit breadth

        return list(discovered)

A/B Testing Framework

# hashtag_ab_test.py
import random
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
import statistics

@dataclass
class ABTestResult:
    test_name: str
    variant_a_tags: list[str]
    variant_b_tags: list[str]
    variant_a_tweets: int
    variant_b_tweets: int
    variant_a_avg_engagement: float
    variant_b_avg_engagement: float
    winner: str
    confidence: float
    recommendation: str

class HashtagABTest:
    """A/B test hashtag strategies."""

    def __init__(self, test_name: str):
        self.test_name = test_name
        self.variant_a_tags: list[str] = []
        self.variant_b_tags: list[str] = []
        self.results_a: list[dict] = []
        self.results_b: list[dict] = []
        self.started_at: Optional[datetime] = None

    def setup(
        self,
        variant_a: list[str],
        variant_b: list[str]
    ):
        """Setup test variants."""
        self.variant_a_tags = variant_a
        self.variant_b_tags = variant_b
        self.started_at = datetime.now()

    def get_tags_for_tweet(self) -> tuple[list[str], str]:
        """Get hashtags for next tweet (random assignment)."""
        variant = random.choice(['A', 'B'])

        if variant == 'A':
            return self.variant_a_tags, 'A'
        else:
            return self.variant_b_tags, 'B'

    def record_result(
        self,
        variant: str,
        tweet_id: str,
        likes: int,
        retweets: int,
        replies: int
    ):
        """Record tweet performance."""
        result = {
            'tweet_id': tweet_id,
            'likes': likes,
            'retweets': retweets,
            'replies': replies,
            'engagement': likes + retweets * 2 + replies * 3,
            'recorded_at': datetime.now()
        }

        if variant == 'A':
            self.results_a.append(result)
        else:
            self.results_b.append(result)

    def analyze(self) -> ABTestResult:
        """Analyze test results."""

        if len(self.results_a) < 5 or len(self.results_b) < 5:
            return ABTestResult(
                test_name=self.test_name,
                variant_a_tags=self.variant_a_tags,
                variant_b_tags=self.variant_b_tags,
                variant_a_tweets=len(self.results_a),
                variant_b_tweets=len(self.results_b),
                variant_a_avg_engagement=0,
                variant_b_avg_engagement=0,
                winner="insufficient_data",
                confidence=0,
                recommendation="Need at least 5 tweets per variant"
            )

        # Calculate averages
        eng_a = [r['engagement'] for r in self.results_a]
        eng_b = [r['engagement'] for r in self.results_b]

        avg_a = statistics.mean(eng_a)
        avg_b = statistics.mean(eng_b)

        # Simple statistical test (t-test approximation)
        std_a = statistics.stdev(eng_a) if len(eng_a) > 1 else 0
        std_b = statistics.stdev(eng_b) if len(eng_b) > 1 else 0

        # Calculate effect size
        pooled_std = ((std_a ** 2 + std_b ** 2) / 2) ** 0.5
        if pooled_std > 0:
            effect_size = abs(avg_a - avg_b) / pooled_std
        else:
            effect_size = 0

        # Determine winner
        if effect_size < 0.2:
            winner = "no_difference"
            confidence = 0.5
        elif avg_a > avg_b:
            winner = "A"
            confidence = min(0.95, 0.5 + effect_size * 0.2)
        else:
            winner = "B"
            confidence = min(0.95, 0.5 + effect_size * 0.2)

        # Generate recommendation
        if winner == "no_difference":
            recommendation = "No significant difference. Consider testing other variations."
        elif confidence > 0.8:
            recommendation = f"Strong evidence for Variant {winner}. Implement these hashtags."
        else:
            recommendation = f"Slight edge for Variant {winner}. Continue testing for confirmation."

        return ABTestResult(
            test_name=self.test_name,
            variant_a_tags=self.variant_a_tags,
            variant_b_tags=self.variant_b_tags,
            variant_a_tweets=len(self.results_a),
            variant_b_tweets=len(self.results_b),
            variant_a_avg_engagement=round(avg_a, 2),
            variant_b_avg_engagement=round(avg_b, 2),
            winner=winner,
            confidence=round(confidence, 2),
            recommendation=recommendation
        )

Strategy Generator

# strategy_generator.py
from hashtag_models import HashtagScore, HashtagStrategy

class StrategyGenerator:
    """Generate optimized hashtag strategies."""

    def __init__(self, max_hashtags: int = 5):
        self.max_hashtags = max_hashtags

    def generate_strategy(
        self,
        scored_hashtags: list[HashtagScore],
        trending: list[str] = None
    ) -> HashtagStrategy:
        """Generate optimized hashtag strategy."""

        # Sort by opportunity score
        sorted_tags = sorted(
            scored_hashtags,
            key=lambda s: s.opportunity_score,
            reverse=True
        )

        # Categorize
        primary = []
        secondary = []
        niche = []
        avoid = []

        for score in sorted_tags:
            if score.recommendation == "use_always":
                if len(primary) < 3:
                    primary.append(score.tag)
                else:
                    secondary.append(score.tag)

            elif score.recommendation == "use_sometimes":
                if score.competition_score > 70:  # Low competition
                    niche.append(score.tag)
                else:
                    secondary.append(score.tag)

            else:  # avoid
                avoid.append(score.tag)

        # Determine trending slots
        trending_slots = max(0, self.max_hashtags - len(primary) - 1)

        return HashtagStrategy(
            primary_tags=primary[:3],
            secondary_tags=secondary[:10],
            trending_slots=trending_slots,
            niche_tags=niche[:5],
            avoid_tags=avoid[:10]
        )

    def get_tags_for_post(
        self,
        strategy: HashtagStrategy,
        trending: list[str] = None,
        max_tags: int = 5
    ) -> list[str]:
        """Get optimized hashtags for a specific post."""

        tags = []

        # Always include primary tags
        tags.extend(strategy.primary_tags)

        # Add trending if available and relevant
        if trending and strategy.trending_slots > 0:
            relevant_trending = [
                t for t in trending
                if t not in strategy.avoid_tags
            ][:strategy.trending_slots]
            tags.extend(relevant_trending)

        # Fill remaining with secondary/niche
        remaining = max_tags - len(tags)
        if remaining > 0:
            import random

            # Mix secondary and niche
            pool = strategy.secondary_tags + strategy.niche_tags
            random.shuffle(pool)
            tags.extend(pool[:remaining])

        return tags[:max_tags]

Weekly Report Generator

# hashtag_report.py
from datetime import datetime
from hashtag_models import HashtagMetrics, HashtagScore

class HashtagReportGenerator:
    """Generate weekly hashtag performance reports."""

    def generate_report(
        self,
        metrics: list[HashtagMetrics],
        scores: list[HashtagScore],
        your_usage: dict[str, int] = None
    ) -> str:
        """Generate markdown report."""

        report = f"""
# Hashtag Performance Report
**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M')}

## Top Performing Hashtags

| Hashtag | Opportunity | Reach | Competition | Recommendation |
|---------|-------------|-------|-------------|----------------|
"""

        # Sort by opportunity
        sorted_scores = sorted(scores, key=lambda s: s.opportunity_score, reverse=True)

        for score in sorted_scores[:15]:
            emoji = {
                "use_always": "✅",
                "use_sometimes": "🔄",
                "avoid": "❌"
            }.get(score.recommendation, "")

            report += f"| #{score.tag} | {score.opportunity_score} | {score.reach_score} | {score.competition_score} | {emoji} {score.recommendation} |\n"

        report += """
## Hashtag Insights

### Best Opportunities
"""

        best = [s for s in sorted_scores if s.recommendation == "use_always"][:5]
        for score in best:
            report += f"\n**#{score.tag}** (Score: {score.opportunity_score})\n"
            for note in score.notes:
                report += f"- {note}\n"

        report += """
### Niche Gems (Low Competition)
"""

        niche = sorted(
            [s for s in scores if s.competition_score > 70],
            key=lambda s: s.opportunity_score,
            reverse=True
        )[:5]

        for score in niche:
            metrics_obj = next((m for m in metrics if m.tag == score.tag), None)
            if metrics_obj:
                report += f"- **#{score.tag}**: {metrics_obj.tweets_per_hour:.1f} tweets/hr, {metrics_obj.engagement_rate*100:.2f}% engagement\n"

        report += """
### Avoid These
"""

        avoid = [s for s in sorted_scores if s.recommendation == "avoid"][:5]
        for score in avoid:
            report += f"- #{score.tag}: {', '.join(score.notes)}\n"

        return report

Complete Usage Example

# main.py
import asyncio
from hashtag_scraper import HashtagScraper
from opportunity_scorer import OpportunityScorer
from niche_discoverer import NicheDiscoverer
from strategy_generator import StrategyGenerator
from hashtag_report import HashtagReportGenerator

async def main():
    # 1. Define hashtags to analyze
    hashtags_to_test = [
        "python", "programming", "coding", "developer",
        "tech", "startup", "ai", "machinelearning",
        "100DaysOfCode", "CodeNewbie", "DevCommunity"
    ]

    # 2. Scrape metrics
    scraper = HashtagScraper()
    metrics = await scraper.analyze_multiple(hashtags_to_test)

    print(f"Analyzed {len(metrics)} hashtags")

    # 3. Score opportunities
    scorer = OpportunityScorer()
    scores = [scorer.score(m, relevance=0.8) for m in metrics]

    # 4. Discover niche hashtags
    discoverer = NicheDiscoverer()
    niche_tags = await discoverer.discover_from_accounts(
        ['successful_account_1', 'successful_account_2'],
        tweets_per_account=50
    )

    print(f"Discovered {len(niche_tags)} niche hashtags")

    # 5. Generate strategy
    generator = StrategyGenerator(max_hashtags=5)
    strategy = generator.generate_strategy(scores)

    print("\nHashtag Strategy:")
    print(f"  Primary: {strategy.primary_tags}")
    print(f"  Secondary: {strategy.secondary_tags[:5]}")
    print(f"  Niche: {strategy.niche_tags}")

    # 6. Get tags for a post
    post_tags = generator.get_tags_for_post(strategy, max_tags=5)
    print(f"\nTags for next post: {' '.join('#' + t for t in post_tags)}")

    # 7. Generate report
    report_gen = HashtagReportGenerator()
    report = report_gen.generate_report(metrics, scores)

    with open("hashtag_report.md", "w") as f:
        f.write(report)

    print("\nReport saved to hashtag_report.md")

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

Hashtag Count

  • Use 3-5 hashtags maximum
  • Quality over quantity
  • Mix popular and niche

Avoid

  • Banned or shadowbanned hashtags
  • Hashtags with >1M tweets/day
  • Irrelevant trending tags