Skip to content

Smart Engagement Pod System

Build a fair and intelligent engagement pod system with reciprocity tracking and anti-detection measures.


Overview

Educational Purpose

This recipe is for educational purposes only. Engagement pods may violate platform terms of service. Use responsibly and understand the risks.

This recipe creates a smart engagement pod system with:

  • Member discovery - Find compatible pod members
  • Reciprocity tracking - Ensure fair participation
  • Fairness algorithms - Balance engagement distribution
  • Anti-detection timing - Natural engagement patterns
  • Performance analytics - Track pod effectiveness
  • Health monitoring - Identify inactive members

System Architecture

┌─────────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Pod            │────▶│  Reciprocity │────▶│  Engagement     │
│  Manager        │     │  Tracker     │     │  Queue          │
└─────────────────┘     └──────────────┘     └─────────────────┘
        │                       │                     │
        ▼                       ▼                     ▼
┌─────────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Member         │     │  Fairness    │     │  Anti-Detection │
│  Discovery      │     │  Calculator  │     │  Scheduler      │
└─────────────────┘     └──────────────┘     └─────────────────┘

Data Models

# pod_models.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum

class EngagementType(Enum):
    LIKE = "like"
    RETWEET = "retweet"
    REPLY = "reply"
    QUOTE = "quote"

@dataclass
class PodMember:
    user_id: str
    username: str
    joined_at: datetime
    followers: int
    avg_engagement_rate: float
    timezone: str = "UTC"

    # Reciprocity metrics
    engagements_given: int = 0
    engagements_received: int = 0
    reciprocity_score: float = 1.0

    # Activity tracking
    last_active: Optional[datetime] = None
    consecutive_misses: int = 0
    is_active: bool = True

    # Performance
    avg_response_time_minutes: float = 0.0
    quality_score: float = 1.0

@dataclass
class EngagementRequest:
    id: str
    tweet_id: str
    tweet_url: str
    author_id: str
    author_username: str
    requested_at: datetime
    engagement_types: list[EngagementType]
    priority: int = 1  # Higher = more urgent

    # Fulfillment tracking
    fulfilled_by: list[str] = field(default_factory=list)
    deadline: Optional[datetime] = None

@dataclass
class EngagementRecord:
    request_id: str
    member_id: str
    engagement_type: EngagementType
    completed_at: datetime
    response_time_minutes: float

Pod Manager

# pod_manager.py
import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Optional

from xeepy import Xeepy

from pod_models import PodMember, EngagementRequest, EngagementType

class PodManager:
    """Manage engagement pod operations."""

    def __init__(self, pod_name: str, max_members: int = 20):
        self.pod_name = pod_name
        self.max_members = max_members
        self.members: dict[str, PodMember] = {}
        self.pending_requests: list[EngagementRequest] = []
        self.completed_requests: list[EngagementRequest] = []

    def add_member(self, member: PodMember) -> bool:
        """Add member to pod."""
        if len(self.members) >= self.max_members:
            return False

        if member.user_id in self.members:
            return False

        self.members[member.user_id] = member
        return True

    def remove_member(self, user_id: str):
        """Remove member from pod."""
        if user_id in self.members:
            del self.members[user_id]

    def submit_request(
        self,
        tweet_id: str,
        tweet_url: str,
        author_id: str,
        engagement_types: list[EngagementType] = None,
        priority: int = 1
    ) -> EngagementRequest:
        """Submit engagement request to pod."""

        if author_id not in self.members:
            raise ValueError("Author must be a pod member")

        request = EngagementRequest(
            id=f"req_{uuid.uuid4().hex[:8]}",
            tweet_id=tweet_id,
            tweet_url=tweet_url,
            author_id=author_id,
            author_username=self.members[author_id].username,
            requested_at=datetime.now(),
            engagement_types=engagement_types or [EngagementType.LIKE],
            priority=priority,
            deadline=datetime.now() + timedelta(hours=24)
        )

        self.pending_requests.append(request)
        return request

    def get_queue_for_member(
        self, 
        member_id: str,
        limit: int = 10
    ) -> list[EngagementRequest]:
        """Get engagement queue for a specific member."""

        member = self.members.get(member_id)
        if not member:
            return []

        # Filter requests not from this member and not yet fulfilled by them
        eligible = [
            r for r in self.pending_requests
            if r.author_id != member_id and member_id not in r.fulfilled_by
        ]

        # Sort by priority and reciprocity
        def sort_key(req):
            author = self.members.get(req.author_id)
            author_reciprocity = author.reciprocity_score if author else 0
            return (req.priority, author_reciprocity, req.requested_at)

        eligible.sort(key=sort_key, reverse=True)

        return eligible[:limit]

    def record_engagement(
        self,
        request_id: str,
        member_id: str,
        engagement_type: EngagementType
    ):
        """Record completed engagement."""

        # Find request
        request = next(
            (r for r in self.pending_requests if r.id == request_id),
            None
        )

        if not request:
            return

        # Update request
        request.fulfilled_by.append(member_id)

        # Update member stats
        member = self.members.get(member_id)
        if member:
            member.engagements_given += 1
            member.last_active = datetime.now()

        author = self.members.get(request.author_id)
        if author:
            author.engagements_received += 1

        # Check if fully fulfilled
        if len(request.fulfilled_by) >= len(self.members) - 1:
            self.pending_requests.remove(request)
            self.completed_requests.append(request)

Reciprocity Tracker

# reciprocity_tracker.py
from datetime import datetime, timedelta
from collections import defaultdict

class ReciprocityTracker:
    """Track and enforce reciprocity in the pod."""

    def __init__(self, lookback_days: int = 7):
        self.lookback_days = lookback_days
        self.engagement_log: list[dict] = []

    def log_engagement(
        self,
        giver_id: str,
        receiver_id: str,
        engagement_type: str,
        timestamp: datetime = None
    ):
        """Log an engagement action."""
        self.engagement_log.append({
            'giver_id': giver_id,
            'receiver_id': receiver_id,
            'engagement_type': engagement_type,
            'timestamp': timestamp or datetime.now()
        })

    def calculate_reciprocity_scores(
        self,
        members: dict[str, 'PodMember']
    ) -> dict[str, float]:
        """Calculate reciprocity scores for all members."""

        cutoff = datetime.now() - timedelta(days=self.lookback_days)
        recent_logs = [l for l in self.engagement_log if l['timestamp'] > cutoff]

        # Count given and received
        given = defaultdict(int)
        received = defaultdict(int)

        for log in recent_logs:
            given[log['giver_id']] += 1
            received[log['receiver_id']] += 1

        # Calculate scores
        scores = {}
        for user_id in members:
            g = given.get(user_id, 0)
            r = received.get(user_id, 0)

            if r == 0:
                # Never received = perfect score (new member)
                scores[user_id] = 1.0
            elif g == 0:
                # Received but never gave = poor score
                scores[user_id] = 0.1
            else:
                # Ratio of given to received
                ratio = g / r
                # Score between 0.1 and 2.0
                scores[user_id] = min(2.0, max(0.1, ratio))

        return scores

    def get_debt_matrix(
        self,
        members: dict[str, 'PodMember']
    ) -> dict[str, dict[str, int]]:
        """Calculate engagement debt between pairs."""

        cutoff = datetime.now() - timedelta(days=self.lookback_days)
        recent_logs = [l for l in self.engagement_log if l['timestamp'] > cutoff]

        # Count pairwise engagements
        given_to = defaultdict(lambda: defaultdict(int))

        for log in recent_logs:
            given_to[log['giver_id']][log['receiver_id']] += 1

        # Calculate debt (positive = owes, negative = owed)
        debt = {}
        for a_id in members:
            debt[a_id] = {}
            for b_id in members:
                if a_id != b_id:
                    a_to_b = given_to[a_id][b_id]
                    b_to_a = given_to[b_id][a_id]
                    debt[a_id][b_id] = b_to_a - a_to_b  # How much A owes B

        return debt

    def identify_freeloaders(
        self,
        members: dict[str, 'PodMember'],
        threshold: float = 0.3
    ) -> list[str]:
        """Identify members who take more than they give."""

        scores = self.calculate_reciprocity_scores(members)
        return [uid for uid, score in scores.items() if score < threshold]

Fairness Calculator

# fairness_calculator.py
from typing import Optional

class FairnessCalculator:
    """Ensure fair engagement distribution."""

    def __init__(self, reciprocity_tracker: 'ReciprocityTracker'):
        self.tracker = reciprocity_tracker

    def prioritize_requests(
        self,
        requests: list['EngagementRequest'],
        members: dict[str, 'PodMember'],
        current_user_id: str
    ) -> list['EngagementRequest']:
        """Prioritize requests based on fairness."""

        # Get debt matrix
        debt = self.tracker.get_debt_matrix(members)
        user_debts = debt.get(current_user_id, {})

        # Get reciprocity scores
        scores = self.tracker.calculate_reciprocity_scores(members)

        def priority_score(request):
            author_id = request.author_id

            # Base priority
            score = request.priority * 10

            # Debt factor (prioritize those we owe)
            debt_to_author = user_debts.get(author_id, 0)
            score += debt_to_author * 5

            # Reciprocity factor (prioritize good contributors)
            author_reciprocity = scores.get(author_id, 1.0)
            score += author_reciprocity * 10

            # Urgency factor (older requests)
            hours_old = (datetime.now() - request.requested_at).total_seconds() / 3600
            score += min(hours_old, 24)  # Cap at 24 hours

            return score

        return sorted(requests, key=priority_score, reverse=True)

    def calculate_fair_quota(
        self,
        member: 'PodMember',
        total_pending: int,
        member_count: int
    ) -> int:
        """Calculate fair engagement quota for member."""

        base_quota = total_pending // (member_count - 1)

        # Adjust by reciprocity
        adjusted = int(base_quota * member.reciprocity_score)

        # Minimum and maximum bounds
        return max(1, min(adjusted, base_quota * 2))

Anti-Detection Scheduler

# anti_detection_scheduler.py
import random
from datetime import datetime, timedelta
from typing import Generator

class AntiDetectionScheduler:
    """Schedule engagements with natural timing patterns."""

    def __init__(
        self,
        min_delay_seconds: int = 30,
        max_delay_seconds: int = 300,
        daily_limit: int = 100,
        active_hours: tuple[int, int] = (8, 23)
    ):
        self.min_delay = min_delay_seconds
        self.max_delay = max_delay_seconds
        self.daily_limit = daily_limit
        self.active_hours = active_hours

        self.daily_count = 0
        self.last_reset = datetime.now().date()

    def get_next_delay(self) -> int:
        """Get natural-looking delay before next action."""

        # Check daily limit
        if datetime.now().date() != self.last_reset:
            self.daily_count = 0
            self.last_reset = datetime.now().date()

        if self.daily_count >= self.daily_limit:
            # Wait until tomorrow
            tomorrow = datetime.now().replace(
                hour=self.active_hours[0], 
                minute=0, 
                second=0
            ) + timedelta(days=1)
            return int((tomorrow - datetime.now()).total_seconds())

        # Check active hours
        current_hour = datetime.now().hour
        if not (self.active_hours[0] <= current_hour < self.active_hours[1]):
            # Wait until active hours
            if current_hour < self.active_hours[0]:
                wait_hours = self.active_hours[0] - current_hour
            else:
                wait_hours = 24 - current_hour + self.active_hours[0]
            return wait_hours * 3600 + random.randint(0, 1800)

        # Normal delay with natural variation
        base_delay = random.randint(self.min_delay, self.max_delay)

        # Add occasional longer pauses (simulates breaks)
        if random.random() < 0.1:  # 10% chance
            base_delay += random.randint(300, 900)  # 5-15 minute break

        # Add micro-variations
        variation = random.gauss(0, base_delay * 0.2)

        return max(self.min_delay, int(base_delay + variation))

    def schedule_batch(
        self,
        count: int
    ) -> Generator[datetime, None, None]:
        """Generate scheduled times for batch of engagements."""

        current_time = datetime.now()

        for _ in range(count):
            delay = self.get_next_delay()
            current_time += timedelta(seconds=delay)
            self.daily_count += 1
            yield current_time

    def is_safe_to_engage(self) -> bool:
        """Check if it's safe to perform engagement."""

        # Check daily limit
        if datetime.now().date() != self.last_reset:
            self.daily_count = 0
            self.last_reset = datetime.now().date()

        if self.daily_count >= self.daily_limit:
            return False

        # Check active hours
        current_hour = datetime.now().hour
        if not (self.active_hours[0] <= current_hour < self.active_hours[1]):
            return False

        return True

Pod Executor

# pod_executor.py
import asyncio
from datetime import datetime

from xeepy import Xeepy

from pod_models import EngagementType, EngagementRequest
from pod_manager import PodManager
from anti_detection_scheduler import AntiDetectionScheduler

class PodExecutor:
    """Execute pod engagement tasks."""

    def __init__(
        self,
        pod_manager: PodManager,
        scheduler: AntiDetectionScheduler
    ):
        self.pod = pod_manager
        self.scheduler = scheduler

    async def execute_queue(self, member_id: str):
        """Execute engagement queue for a member."""

        queue = self.pod.get_queue_for_member(member_id)

        if not queue:
            print("No pending engagements")
            return

        print(f"Processing {len(queue)} engagement requests...")

        async with Xeepy() as x:
            for request in queue:
                # Check if safe
                if not self.scheduler.is_safe_to_engage():
                    print("Rate limit reached, stopping")
                    break

                # Wait natural delay
                delay = self.scheduler.get_next_delay()
                print(f"Waiting {delay}s before next engagement...")
                await asyncio.sleep(delay)

                # Execute engagements
                for eng_type in request.engagement_types:
                    try:
                        await self._execute_engagement(
                            x, request.tweet_url, eng_type
                        )

                        self.pod.record_engagement(
                            request.id,
                            member_id,
                            eng_type
                        )

                        print(f"✓ {eng_type.value} on {request.tweet_url}")

                    except Exception as e:
                        print(f"✗ Failed: {e}")

    async def _execute_engagement(
        self,
        x: Xeepy,
        tweet_url: str,
        engagement_type: EngagementType
    ):
        """Execute single engagement action."""

        if engagement_type == EngagementType.LIKE:
            await x.engage.like(tweet_url)

        elif engagement_type == EngagementType.RETWEET:
            await x.engage.retweet(tweet_url)

        elif engagement_type == EngagementType.REPLY:
            # Would need reply text
            pass

        elif engagement_type == EngagementType.QUOTE:
            # Would need quote text
            pass

Usage Example

# main.py
import asyncio
from datetime import datetime

from pod_manager import PodManager
from pod_models import PodMember, EngagementType
from reciprocity_tracker import ReciprocityTracker
from anti_detection_scheduler import AntiDetectionScheduler
from pod_executor import PodExecutor

async def main():
    # Create pod
    pod = PodManager("tech_pod", max_members=10)

    # Add members
    members = [
        PodMember("1", "user1", datetime.now(), 5000, 0.05, "America/New_York"),
        PodMember("2", "user2", datetime.now(), 8000, 0.04, "Europe/London"),
        PodMember("3", "user3", datetime.now(), 3000, 0.06, "Asia/Tokyo"),
    ]

    for member in members:
        pod.add_member(member)

    # Submit engagement request
    request = pod.submit_request(
        tweet_id="123456789",
        tweet_url="https://x.com/user1/status/123456789",
        author_id="1",
        engagement_types=[EngagementType.LIKE, EngagementType.RETWEET],
        priority=2
    )

    print(f"Request submitted: {request.id}")

    # Execute for member 2
    scheduler = AntiDetectionScheduler(
        min_delay_seconds=60,
        max_delay_seconds=300,
        daily_limit=50
    )

    executor = PodExecutor(pod, scheduler)
    await executor.execute_queue("2")

    # Check reciprocity
    tracker = ReciprocityTracker()
    scores = tracker.calculate_reciprocity_scores(pod.members)

    print("\nReciprocity Scores:")
    for uid, score in scores.items():
        print(f"  {pod.members[uid].username}: {score:.2f}")

if __name__ == "__main__":
    asyncio.run(main())

Ethical Considerations

Platform Terms

Engagement pods may violate X/Twitter's Terms of Service. Accounts may be suspended. Use at your own risk.

Authenticity

  • Pod engagement creates artificial metrics
  • It can damage genuine community trust
  • Consider organic growth strategies instead

If You Must

  • Keep pods small (<10 members)
  • Focus on genuine content
  • Use natural timing
  • Don't over-engage
  • Be prepared for consequences