Technical Details
Deep technical dive into how MoBots Buy Bot works under the hood. Explore blockchain monitoring, smart contract interactions, event processing, and system architecture.
System Architecture
High-Level Overview
Monad Blockchain
↓
Event Monitoring
↓
Transaction Analysis
↓
Notification Formatting
↓
Telegram API
↓
Your Community GroupComponents
Blockchain Monitor
- Web3 connection to Monad RPC
- Block polling (2-second intervals)
- Event filtering and processing
- Transaction receipt analysis
Blockchain Integration
Monad Network Connection
Network Details:
- Chain: Monad
- Chain ID: 143
- Block Time: ~1 second
- Consensus: Proof of Stake
- Native Token: MON
RPC Configuration:
- Connection: WebSocket/HTTP
- Provider: Web3.py
- Timeout: 30 seconds
- Retry: 3 attempts
- Fallback: Multiple RPC endpoints
Smart Contract Interactions
Token Contract
ERC-20 Standard Methods:
# Get token decimals
decimals = token_contract.functions.decimals().call()
# Get token symbol
symbol = token_contract.functions.symbol().call()
# Get token name
name = token_contract.functions.name().call()
# Monitor Transfer events
transfer_filter = token_contract.events.Transfer.create_filter(
fromBlock='latest',
argument_filters={'from': pool_address}
)Transfer Event Structure:
event Transfer(
address indexed from,
address indexed to,
uint256 value
)Factory Contracts
Uniswap V2 Interface:
# Get pool address for token pair
pool_address = factory.functions.getPair(
token_address,
wmon_address
).call()
# Get all pairs count
pairs_count = factory.functions.allPairsLength().call()
# Get specific pair by index
pair = factory.functions.allPairs(index).call()Factory Addresses (Monad):
FACTORIES = {
'Uniswap V2': '0x...',
'PancakeSwap V2': '0x...',
'OctoSwap V2': '0x...',
'TayaSwap V2': '0x...',
'Purps Exchange V2': '0x...',
'Madness Finance V2': '0x...'
}Pool Contracts
Pair Interface:
# Get tokens in pair
token0 = pool.functions.token0().call()
token1 = pool.functions.token1().call()
# Get reserves
reserves = pool.functions.getReserves().call()
# Returns: (reserve0, reserve1, blockTimestampLast)
# Monitor Sync events
sync_filter = pool.events.Sync.create_filter(
fromBlock='latest'
)Event Monitoring System
Block Polling
Polling Strategy:
def monitor_blocks():
while True:
current_block = w3.eth.block_number
if current_block > last_processed_block:
process_blocks(last_processed_block + 1, current_block)
last_processed_block = current_block
time.sleep(2) # Poll every 2 secondsWhy 2 seconds?
- Monad block time: ~1 second
- Allows 1-2 blocks per check
- Balance between latency and load
- Catches all blocks reliably
Event Filtering
Transfer Event Detection:
def detect_buys(pool_address, token_address):
# Create event filter
transfer_filter = token_contract.events.Transfer.create_filter(
fromBlock=start_block,
toBlock=end_block,
argument_filters={
'from': pool_address # From pool = buy
}
)
# Get all transfer events
events = transfer_filter.get_all_entries()
for event in events:
# event['args']['from'] = pool_address
# event['args']['to'] = buyer_address
# event['args']['value'] = token_amount
if is_buy_event(event):
process_buy(event)Buy vs Sell Detection:
- Buy: Token transfers FROM pool TO buyer
- Sell: Token transfers FROM seller TO pool
- Filter: Only monitor transfers FROM pool
Transaction Analysis
Processing Buy Transactions:
def analyze_buy(tx_hash):
# Get transaction receipt
receipt = w3.eth.get_transaction_receipt(tx_hash)
# Extract buyer address
buyer = receipt['from']
# Get token amount from Transfer event
token_amount = parse_transfer_amount(receipt)
# Get MON spent from WMON Transfer
mon_spent = parse_mon_spent(receipt)
# Identify DEX used
dex_name = identify_dex(pool_address)
return {
'buyer': buyer,
'token_amount': token_amount,
'mon_spent': mon_spent,
'tx_hash': tx_hash,
'dex': dex_name,
'timestamp': receipt['blockNumber']
}DEX Identification System
Dynamic Identification
Identification Process:
def identify_dex(pool_address):
# Check cache first
if pool_address in dex_cache:
return dex_cache[pool_address]
# Query each factory
for dex_name, factory_address in FACTORIES.items():
factory = w3.eth.contract(
address=factory_address,
abi=FACTORY_ABI
)
# Get pair for tokens
pair = factory.functions.getPair(
token_address,
wmon_address
).call()
# Check if matches
if pair.lower() == pool_address.lower():
dex_cache[pool_address] = dex_name
return dex_name
return 'Unknown'Caching Strategy
Cache Implementation:
- Storage: In-memory dictionary
- Lifetime: Session-based (until restart)
- Size: Unlimited (pools are finite)
- Invalidation: Manual clear or restart
- Benefit: 99% reduction in RPC calls
Cache Structure:
dex_cache = {
'0x1234...': 'Uniswap V2',
'0x5678...': 'PancakeSwap V2',
'0x9abc...': 'OctoSwap V2'
}Notification System
Message Formatting
MarkdownV2 Formatting:
def format_notification(buy_data):
# Calculate green circles
circles = min(int(buy_data['mon_spent'] / 0.1), 10)
indicator = '🟢' * circles
# Format amounts
token_amount = format_number(buy_data['token_amount'])
mon_spent = f"{buy_data['mon_spent']:.4f}"
# Shorten address
buyer = shorten_address(buy_data['buyer'])
# Build message
message = f"""
{indicator} New Buy!
💰 Amount: {token_amount} {symbol}
💸 Spent: {mon_spent} MON
👤 Buyer: [{buyer}]({explorer_url}/address/{buy_data['buyer']})
🔍 Tx: [View on Explorer]({explorer_url}/tx/{buy_data['tx_hash']})
🔄 DEX: [{buy_data['dex']}]({dexscreener_url})
{format_social_links(config['social_links'])}
Powered by 🤖 Mobot's Monad Buy Bot
[Purchase on MoBots]({mobots_url})
"""
return messageImage Handling
Image Upload:
def send_with_image(chat_id, message, image_url):
try:
# Try sending with photo
bot.send_photo(
chat_id=chat_id,
photo=image_url,
caption=message,
parse_mode='MarkdownV2'
)
except Exception as e:
# Fallback to text-only
bot.send_message(
chat_id=chat_id,
text=message,
parse_mode='MarkdownV2',
disable_web_page_preview=False
)Image Requirements:
- Format: PNG, JPG
- Max size: 5MB (Telegram limit)
- Access: Publicly accessible URL
- Fallback: Text-only if image fails
Telegram API Integration
python-telegram-bot Library:
from telegram import Bot
from telegram.ext import Updater, CommandHandler
# Initialize bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# Send notification
def send_notification(chat_id, message):
bot.send_message(
chat_id=chat_id,
text=message,
parse_mode='MarkdownV2',
disable_web_page_preview=False
)Rate Limiting:
- Telegram limit: 30 messages/second per bot
- Our implementation: Queued delivery
- Burst protection: Spread over time
- Never hit limits in practice
Data Storage
MongoDB Schema
group_configs Collection:
{
_id: ObjectId,
chat_id: Number, // Telegram group ID
chat_title: String, // Group name
creator_id: Number, // Creator's Telegram ID
token_address: String, // Monitored token
pool_addresses: [String], // Pool addresses
dex_names: [String], // DEX names per pool
notification_image: String, // Image URL
minimum_amount: Number, // Min buy threshold
social_links: {
discord: String,
telegram: String,
twitter: String,
website: String
},
active: Boolean, // Active/inactive
created_at: Date,
updated_at: Date
}Indexes:
{
chat_id: 1;
} // Quick lookup by group
{
creator_id: 1;
} // List user's bots
{
active: 1;
} // Filter active botsPerformance Optimizations
Concurrency
Thread Pool:
from concurrent.futures import ThreadPoolExecutor
# Create thread pool
executor = ThreadPoolExecutor(max_workers=32)
# Process monitors concurrently
for monitor in active_monitors:
executor.submit(run_monitor, monitor)Benefits:
- Multiple monitors run simultaneously
- Non-blocking operations
- Scales to hundreds of bots
- Efficient resource usage
Transaction Deduplication
Cache Implementation:
# Sliding window cache
processed_txs = {} # {tx_hash: block_number}
CACHE_SIZE = 100 # blocks
def is_processed(tx_hash, current_block):
if tx_hash in processed_txs:
return True
# Add to cache
processed_txs[tx_hash] = current_block
# Cleanup old entries
if len(processed_txs) > 1000:
cleanup_cache(current_block - CACHE_SIZE)
return False
def cleanup_cache(min_block):
for tx_hash, block in list(processed_txs.items()):
if block < min_block:
del processed_txs[tx_hash]Benefits:
- Prevents duplicate notifications
- Memory efficient
- Automatic cleanup
- Handles reorganizations
Security & Privacy
Data Handling
What We Store:
- ✅ Public blockchain data
- ✅ Public group IDs
- ✅ Creator Telegram IDs
- ✅ User-provided settings
What We Don't Store:
- ❌ Private keys
- ❌ Wallet seeds
- ❌ Chat messages
- ❌ Sensitive user data
Permission Verification
Creator Verification:
def verify_creator(user_id, chat_id):
config = db.group_configs.find_one({'chat_id': chat_id})
if not config:
return False
return config['creator_id'] == user_id
# Usage
if not verify_creator(message.from_user.id, chat_id):
return "❌ Only the creator can manage this bot"Reliability Features
Error Handling
def safe_process_buy(buy_data):
try:
# Format notification
message = format_notification(buy_data)
# Send to Telegram
send_notification(chat_id, message, image_url)
except TelegramError as e:
logger.error(f"Telegram error: {e}")
# Retry with text-only
send_notification(chat_id, message, image_url=None)
except Exception as e:
logger.error(f"Processing error: {e}")
# Log but don't crash monitorRetry Logic
def retry_operation(func, max_retries=3):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoffSystem Requirements
Server Specifications
Minimum:
- CPU: 2 cores
- RAM: 2GB
- Storage: 10GB
- Network: 10 Mbps
Recommended:
- CPU: 4+ cores
- RAM: 4GB+
- Storage: 20GB SSD
- Network: 100 Mbps
Dependencies
Python Libraries:
web3==6.0.0
python-telegram-bot==20.0
pymongo==4.0
requests==2.31.0External Services:
- Monad RPC endpoint
- MongoDB instance
- Telegram Bot API
- Internet connection
Next Steps
❓ FAQ
Frequently asked questions about the bot.
🔧 Advanced Features
Learn about advanced bot capabilities.
📘 Getting Started
Ready to use it? Start with the setup guide.