Group Notification System
Technical Details

Technical Details

Deep technical dive into how MoBots Buy Bot works under the hood. Explore blockchain monitoring, smart contract interactions, event processing, and system architecture.


System Architecture

High-Level Overview

Monad Blockchain

  Event Monitoring

  Transaction Analysis

  Notification Formatting

  Telegram API

  Your Community Group

Components

Blockchain Monitor

  • Web3 connection to Monad RPC
  • Block polling (2-second intervals)
  • Event filtering and processing
  • Transaction receipt analysis

Configuration Database

  • MongoDB storage
  • Bot configurations
  • Pool addresses
  • User settings
  • Social links

Notification Engine

  • Message formatting
  • Image attachment
  • Link generation
  • Telegram API integration
  • Delivery queue

DEX Identifier

  • Factory contract queries
  • Pool-to-DEX mapping
  • Cache management
  • Dynamic identification

Blockchain Integration

Monad Network Connection

Network Details:

  • Chain: Monad
  • Chain ID: 143
  • Block Time: ~1 second
  • Consensus: Proof of Stake
  • Native Token: MON

RPC Configuration:

  • Connection: WebSocket/HTTP
  • Provider: Web3.py
  • Timeout: 30 seconds
  • Retry: 3 attempts
  • Fallback: Multiple RPC endpoints

Smart Contract Interactions

Token Contract

ERC-20 Standard Methods:

# Get token decimals
decimals = token_contract.functions.decimals().call()
 
# Get token symbol
symbol = token_contract.functions.symbol().call()
 
# Get token name
name = token_contract.functions.name().call()
 
# Monitor Transfer events
transfer_filter = token_contract.events.Transfer.create_filter(
    fromBlock='latest',
    argument_filters={'from': pool_address}
)

Transfer Event Structure:

event Transfer(
    address indexed from,
    address indexed to,
    uint256 value
)

Factory Contracts

Uniswap V2 Interface:

# Get pool address for token pair
pool_address = factory.functions.getPair(
    token_address,
    wmon_address
).call()
 
# Get all pairs count
pairs_count = factory.functions.allPairsLength().call()
 
# Get specific pair by index
pair = factory.functions.allPairs(index).call()

Factory Addresses (Monad):

FACTORIES = {
    'Uniswap V2': '0x...',
    'PancakeSwap V2': '0x...',
    'OctoSwap V2': '0x...',
    'TayaSwap V2': '0x...',
    'Purps Exchange V2': '0x...',
    'Madness Finance V2': '0x...'
}

Pool Contracts

Pair Interface:

# Get tokens in pair
token0 = pool.functions.token0().call()
token1 = pool.functions.token1().call()
 
# Get reserves
reserves = pool.functions.getReserves().call()
# Returns: (reserve0, reserve1, blockTimestampLast)
 
# Monitor Sync events
sync_filter = pool.events.Sync.create_filter(
    fromBlock='latest'
)

Event Monitoring System

Block Polling

Polling Strategy:

def monitor_blocks():
    while True:
        current_block = w3.eth.block_number
 
        if current_block > last_processed_block:
            process_blocks(last_processed_block + 1, current_block)
            last_processed_block = current_block
 
        time.sleep(2)  # Poll every 2 seconds

Why 2 seconds?

  • Monad block time: ~1 second
  • Allows 1-2 blocks per check
  • Balance between latency and load
  • Catches all blocks reliably

Event Filtering

Transfer Event Detection:

def detect_buys(pool_address, token_address):
    # Create event filter
    transfer_filter = token_contract.events.Transfer.create_filter(
        fromBlock=start_block,
        toBlock=end_block,
        argument_filters={
            'from': pool_address  # From pool = buy
        }
    )
 
    # Get all transfer events
    events = transfer_filter.get_all_entries()
 
    for event in events:
        # event['args']['from'] = pool_address
        # event['args']['to'] = buyer_address
        # event['args']['value'] = token_amount
 
        if is_buy_event(event):
            process_buy(event)

Buy vs Sell Detection:

  • Buy: Token transfers FROM pool TO buyer
  • Sell: Token transfers FROM seller TO pool
  • Filter: Only monitor transfers FROM pool

Transaction Analysis

Processing Buy Transactions:

def analyze_buy(tx_hash):
    # Get transaction receipt
    receipt = w3.eth.get_transaction_receipt(tx_hash)
 
    # Extract buyer address
    buyer = receipt['from']
 
    # Get token amount from Transfer event
    token_amount = parse_transfer_amount(receipt)
 
    # Get MON spent from WMON Transfer
    mon_spent = parse_mon_spent(receipt)
 
    # Identify DEX used
    dex_name = identify_dex(pool_address)
 
    return {
        'buyer': buyer,
        'token_amount': token_amount,
        'mon_spent': mon_spent,
        'tx_hash': tx_hash,
        'dex': dex_name,
        'timestamp': receipt['blockNumber']
    }

DEX Identification System

Dynamic Identification

Identification Process:

def identify_dex(pool_address):
    # Check cache first
    if pool_address in dex_cache:
        return dex_cache[pool_address]
 
    # Query each factory
    for dex_name, factory_address in FACTORIES.items():
        factory = w3.eth.contract(
            address=factory_address,
            abi=FACTORY_ABI
        )
 
        # Get pair for tokens
        pair = factory.functions.getPair(
            token_address,
            wmon_address
        ).call()
 
        # Check if matches
        if pair.lower() == pool_address.lower():
            dex_cache[pool_address] = dex_name
            return dex_name
 
    return 'Unknown'

Caching Strategy

Cache Implementation:

  • Storage: In-memory dictionary
  • Lifetime: Session-based (until restart)
  • Size: Unlimited (pools are finite)
  • Invalidation: Manual clear or restart
  • Benefit: 99% reduction in RPC calls

Cache Structure:

dex_cache = {
    '0x1234...': 'Uniswap V2',
    '0x5678...': 'PancakeSwap V2',
    '0x9abc...': 'OctoSwap V2'
}

Notification System

Message Formatting

MarkdownV2 Formatting:

def format_notification(buy_data):
    # Calculate green circles
    circles = min(int(buy_data['mon_spent'] / 0.1), 10)
    indicator = '🟢' * circles
 
    # Format amounts
    token_amount = format_number(buy_data['token_amount'])
    mon_spent = f"{buy_data['mon_spent']:.4f}"
 
    # Shorten address
    buyer = shorten_address(buy_data['buyer'])
 
    # Build message
    message = f"""
{indicator} New Buy!
 
💰 Amount: {token_amount} {symbol}
💸 Spent: {mon_spent} MON
👤 Buyer: [{buyer}]({explorer_url}/address/{buy_data['buyer']})
🔍 Tx: [View on Explorer]({explorer_url}/tx/{buy_data['tx_hash']})
🔄 DEX: [{buy_data['dex']}]({dexscreener_url})
 
{format_social_links(config['social_links'])}
 
Powered by 🤖 Mobot's Monad Buy Bot
[Purchase on MoBots]({mobots_url})
"""
 
    return message

Image Handling

Image Upload:

def send_with_image(chat_id, message, image_url):
    try:
        # Try sending with photo
        bot.send_photo(
            chat_id=chat_id,
            photo=image_url,
            caption=message,
            parse_mode='MarkdownV2'
        )
    except Exception as e:
        # Fallback to text-only
        bot.send_message(
            chat_id=chat_id,
            text=message,
            parse_mode='MarkdownV2',
            disable_web_page_preview=False
        )

Image Requirements:

  • Format: PNG, JPG
  • Max size: 5MB (Telegram limit)
  • Access: Publicly accessible URL
  • Fallback: Text-only if image fails

Telegram API Integration

python-telegram-bot Library:

from telegram import Bot
from telegram.ext import Updater, CommandHandler
 
# Initialize bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
 
# Send notification
def send_notification(chat_id, message):
    bot.send_message(
        chat_id=chat_id,
        text=message,
        parse_mode='MarkdownV2',
        disable_web_page_preview=False
    )

Rate Limiting:

  • Telegram limit: 30 messages/second per bot
  • Our implementation: Queued delivery
  • Burst protection: Spread over time
  • Never hit limits in practice

Data Storage

MongoDB Schema

group_configs Collection:

{
  _id: ObjectId,
  chat_id: Number,                    // Telegram group ID
  chat_title: String,                 // Group name
  creator_id: Number,                 // Creator's Telegram ID
  token_address: String,              // Monitored token
  pool_addresses: [String],           // Pool addresses
  dex_names: [String],                // DEX names per pool
  notification_image: String,         // Image URL
  minimum_amount: Number,             // Min buy threshold
  social_links: {
    discord: String,
    telegram: String,
    twitter: String,
    website: String
  },
  active: Boolean,                    // Active/inactive
  created_at: Date,
  updated_at: Date
}

Indexes:

{
  chat_id: 1;
} // Quick lookup by group
{
  creator_id: 1;
} // List user's bots
{
  active: 1;
} // Filter active bots

Performance Optimizations

Concurrency

Thread Pool:

from concurrent.futures import ThreadPoolExecutor
 
# Create thread pool
executor = ThreadPoolExecutor(max_workers=32)
 
# Process monitors concurrently
for monitor in active_monitors:
    executor.submit(run_monitor, monitor)

Benefits:

  • Multiple monitors run simultaneously
  • Non-blocking operations
  • Scales to hundreds of bots
  • Efficient resource usage

Transaction Deduplication

Cache Implementation:

# Sliding window cache
processed_txs = {}  # {tx_hash: block_number}
CACHE_SIZE = 100  # blocks
 
def is_processed(tx_hash, current_block):
    if tx_hash in processed_txs:
        return True
 
    # Add to cache
    processed_txs[tx_hash] = current_block
 
    # Cleanup old entries
    if len(processed_txs) > 1000:
        cleanup_cache(current_block - CACHE_SIZE)
 
    return False
 
def cleanup_cache(min_block):
    for tx_hash, block in list(processed_txs.items()):
        if block < min_block:
            del processed_txs[tx_hash]

Benefits:

  • Prevents duplicate notifications
  • Memory efficient
  • Automatic cleanup
  • Handles reorganizations

Security & Privacy

Data Handling

What We Store:

  • ✅ Public blockchain data
  • ✅ Public group IDs
  • ✅ Creator Telegram IDs
  • ✅ User-provided settings

What We Don't Store:

  • ❌ Private keys
  • ❌ Wallet seeds
  • ❌ Chat messages
  • ❌ Sensitive user data

Permission Verification

Creator Verification:

def verify_creator(user_id, chat_id):
    config = db.group_configs.find_one({'chat_id': chat_id})
 
    if not config:
        return False
 
    return config['creator_id'] == user_id
 
# Usage
if not verify_creator(message.from_user.id, chat_id):
    return "❌ Only the creator can manage this bot"

Reliability Features

Error Handling

def safe_process_buy(buy_data):
    try:
        # Format notification
        message = format_notification(buy_data)
 
        # Send to Telegram
        send_notification(chat_id, message, image_url)
 
    except TelegramError as e:
        logger.error(f"Telegram error: {e}")
        # Retry with text-only
        send_notification(chat_id, message, image_url=None)
 
    except Exception as e:
        logger.error(f"Processing error: {e}")
        # Log but don't crash monitor

Retry Logic

def retry_operation(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

System Requirements

Server Specifications

Minimum:

  • CPU: 2 cores
  • RAM: 2GB
  • Storage: 10GB
  • Network: 10 Mbps

Recommended:

  • CPU: 4+ cores
  • RAM: 4GB+
  • Storage: 20GB SSD
  • Network: 100 Mbps

Dependencies

Python Libraries:

web3==6.0.0
python-telegram-bot==20.0
pymongo==4.0
requests==2.31.0

External Services:

  • Monad RPC endpoint
  • MongoDB instance
  • Telegram Bot API
  • Internet connection

Next Steps

❓ FAQ

Frequently asked questions about the bot.

🔧 Advanced Features

Learn about advanced bot capabilities.

📘 Getting Started

Ready to use it? Start with the setup guide.