import os import logging import re from telegram import Update, ReplyKeyboardMarkup, KeyboardButton from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from firebase_admin import credentials, firestore, initialize_app from dotenv import load_dotenv # Load environment variables load_dotenv() # Set up logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Initialize Firebase try: # Use environment variables for Firebase credentials firebase_config = { "type": "service_account", "project_id": os.getenv('FIREBASE_PROJECT_ID'), "private_key_id": os.getenv('FIREBASE_PRIVATE_KEY_ID'), "private_key": os.getenv('FIREBASE_PRIVATE_KEY').replace('\\n', '\n') if os.getenv('FIREBASE_PRIVATE_KEY') else None, "client_email": os.getenv('FIREBASE_CLIENT_EMAIL'), "client_id": os.getenv('FIREBASE_CLIENT_ID'), "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": os.getenv('FIREBASE_CLIENT_CERT_URL'), "universe_domain": "googleapis.com" } # Check if all required environment variables are set required_vars = ['FIREBASE_PROJECT_ID', 'FIREBASE_PRIVATE_KEY', 'FIREBASE_CLIENT_EMAIL'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") cred = credentials.Certificate(firebase_config) firebase_app = initialize_app(cred) db = firestore.client() logger.info("Firebase initialized successfully") except Exception as e: logger.error(f"Error initializing Firebase: {e}") raise # Telegram Bot Token BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') if not BOT_TOKEN: logger.error("TELEGRAM_BOT_TOKEN environment variable is not set") raise ValueError("TELEGRAM_BOT_TOKEN environment variable is not set") def normalize_phone_number(phone_number): """Normalize phone number by removing all non-digit characters except leading +""" if not phone_number: return "" # If starts with +, keep it and remove all other non-digit characters if phone_number.startswith('+'): return '+' + re.sub(r'\D', '', phone_number[1:]) else: # Otherwise, just remove all non-digit characters return re.sub(r'\D', '', phone_number) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Send a message when the command /start is issued.""" try: user = update.effective_user args = context.args logger.info(f"Start command received from user {user.id} with args: {args}") if args: verification_code = args[0] logger.info(f"Verification code: {verification_code}") await handle_verification(update, context, verification_code) else: # Ask for phone number if no verification code provided keyboard = [[KeyboardButton("Share Phone Number", request_contact=True)]] reply_markup = ReplyKeyboardMarkup(keyboard, resize_keyboard=True, one_time_keyboard=True) await update.message.reply_text( "Welcome to Dopamine Quiz Bot! 👋\n\n" "To verify your phone number, please share your contact using the button below.", reply_markup=reply_markup ) except Exception as e: logger.error(f"Error in start command: {e}", exc_info=True) await update.message.reply_text("An error occurred. Please try again.") async def handle_verification(update: Update, context: ContextTypes.DEFAULT_TYPE, verification_code: str) -> None: """Handle verification code from the start command.""" try: logger.info(f"Handling verification for code: {verification_code}") # Look up the verification code in Firestore verification_ref = db.collection('telegramVerifications').document(verification_code) verification_doc = verification_ref.get() logger.info(f"Verification document exists: {verification_doc.exists}") if not verification_doc.exists: await update.message.reply_text("Invalid verification code. Please try again from the website.") return verification_data = verification_doc.to_dict() logger.info(f"Verification data: {verification_data}") user_id = verification_data.get('userId') expected_phone = verification_data.get('phone') if not user_id or not expected_phone: logger.error("No user ID or phone found in verification data") await update.message.reply_text("Invalid verification data. Please try again from the website.") return # Get the user document to check if it exists user_ref = db.collection('users').document(user_id) user_doc = user_ref.get() logger.info(f"User document exists: {user_doc.exists}") if not user_doc.exists: await update.message.reply_text("User not found. Please try again from the website.") return # Ask user to share their phone number via contact button keyboard = [[KeyboardButton("Share Phone Number", request_contact=True)]] reply_markup = ReplyKeyboardMarkup(keyboard, resize_keyboard=True, one_time_keyboard=True) await update.message.reply_text( "Please share your phone number to complete verification.\n\n" "Tap the button below to share your contact information.", reply_markup=reply_markup ) # Store verification data in context for later use context.user_data['verification_data'] = verification_data context.user_data['verification_code'] = verification_code except Exception as e: logger.error(f"Error in handle_verification: {e}", exc_info=True) await update.message.reply_text("An error occurred during verification. Please try again.") async def handle_contact(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the user's shared contact.""" try: contact = update.message.contact user_id = contact.user_id logger.info(f"Contact shared by user {user_id}") if user_id and user_id != update.effective_user.id: await update.message.reply_text("Please share your own contact information.") return phone_number = contact.phone_number if not phone_number: await update.message.reply_text("No phone number found in contact.") return logger.info(f"Phone number received: {phone_number}") # Normalize the shared phone number shared_phone_normalized = normalize_phone_number(phone_number) logger.info(f"Normalized shared phone: {shared_phone_normalized}") # Check if we have verification data from context (from handle_verification) verification_data = context.user_data.get('verification_data') verification_code = context.user_data.get('verification_code') if verification_data and verification_code: # This is a verification from a start command with code expected_phone = verification_data.get('phone') user_id = verification_data.get('userId') if not expected_phone or not user_id: await update.message.reply_text("Verification data is incomplete. Please try again.") return # Normalize the expected phone number for comparison expected_phone_normalized = normalize_phone_number(expected_phone) logger.info(f"Comparing shared: {shared_phone_normalized} with expected: {expected_phone_normalized}") # Verify that the phone numbers match if shared_phone_normalized != expected_phone_normalized: await update.message.reply_text( "❌ Phone number verification failed!\n\n" f"Your Telegram phone number ({shared_phone_normalized}) does not match " f"the expected phone number ({expected_phone_normalized}).\n\n" "Please make sure you're using the same phone number that you registered with." ) return # Get user document user_ref = db.collection('users').document(user_id) user_doc = user_ref.get() if not user_doc.exists: await update.message.reply_text("User not found. Please try again from the website.") return # Update user document in Firestore to mark phone as verified update_data = { 'phoneVerified': True, 'telegramUsername': update.effective_user.username, 'telegramId': update.effective_user.id, 'verifiedAt': firestore.SERVER_TIMESTAMP } logger.info(f"Updating user document with: {update_data}") await user_ref.update(update_data) # Delete the verification code to prevent reuse verification_ref = db.collection('telegramVerifications').document(verification_code) verification_ref.delete() logger.info("Verification document deleted") # Clear context data context.user_data.pop('verification_data', None) context.user_data.pop('verification_code', None) await update.message.reply_text( "✅ Your phone number has been verified successfully!\n\n" "You can now return to the website to continue." ) logger.info(f"User {user_id} verified successfully via contact sharing") else: # This is a standalone contact sharing (not from verification code) # Check if this phone number exists in any pending verification verifications_ref = db.collection('telegramVerifications') found = False # We need to iterate through all verifications and compare normalized phone numbers all_verifications = verifications_ref.stream() for doc in all_verifications: verification_data = doc.to_dict() logger.info(f"Checking verification data: {verification_data}") expected_phone = verification_data.get('phone') user_id = verification_data.get('userId') if not expected_phone or not user_id: continue # Normalize the expected phone number for comparison expected_phone_normalized = normalize_phone_number(expected_phone) if shared_phone_normalized == expected_phone_normalized: # Get user document user_ref = db.collection('users').document(user_id) user_doc = user_ref.get() if not user_doc.exists: continue # Update user document await user_ref.update({ 'phoneVerified': True, 'telegramUsername': update.effective_user.username, 'telegramId': update.effective_user.id, 'verifiedAt': firestore.SERVER_TIMESTAMP }) # Delete the verification document doc.reference.delete() found = True logger.info(f"User {user_id} verified successfully via contact sharing") break if found: await update.message.reply_text( "✅ Your phone number has been verified successfully!\n\n" "You can now return to the website to continue." ) else: await update.message.reply_text( "No pending verification found for this phone number. " "Please start the verification process from the website first." ) except Exception as e: logger.error(f"Error in handle_contact: {e}", exc_info=True) await update.message.reply_text("An error occurred while processing your contact. Please try again.") async def debug_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Debug command to check bot status.""" try: user = update.effective_user message = ( f"🤖 *Bot Debug Information*\n\n" f"*User ID:* {user.id}\n" f"*Username:* {user.username}\n" f"*First Name:* {user.first_name}\n" f"*Last Name:* {user.last_name}\n\n" f"Bot is running correctly. Use /start with your verification code to begin." ) await update.message.reply_text(message, parse_mode='Markdown') except Exception as e: logger.error(f"Error in debug_command: {e}") await update.message.reply_text("Error generating debug information.") async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Send a message when the command /help is issued.""" try: await update.message.reply_text( "🤖 *Dopamine Quiz Bot Help*\n\n" "This bot helps verify your phone number for the Dopamine Quiz website.\n\n" "*How to use:*\n" "1. Start the verification process on the website\n" "2. Click the 'Verify via Telegram' button\n" "3. This bot will automatically verify your phone number\n\n" "If you have any issues, please contact support.\n\n" "Use /debug to check bot status.", parse_mode='Markdown' ) except Exception as e: logger.error(f"Error in help_command: {e}") async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Check verification status.""" try: # Check if user exists in our database user_id = update.effective_user.id users_ref = db.collection('users') query = users_ref.where('telegramId', '==', user_id).stream() verified = False for doc in query: user_data = doc.to_dict() if user_data.get('phoneVerified'): verified = True break if verified: await update.message.reply_text( "✅ Your phone number is already verified!\n\n" "You can return to the website to continue." ) else: await update.message.reply_text( "Your phone number is not yet verified.\n\n" "Please start the verification process from the website first." ) except Exception as e: logger.error(f"Error in status_command: {e}") await update.message.reply_text("An error occurred while checking your status. Please try again.") async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Log errors caused by Updates.""" logger.error(f"Exception while handling an update: {context.error}", exc_info=context.error) def main() -> None: """Start the bot.""" # Create the Application and pass it your bot's token. application = Application.builder().token(BOT_TOKEN).build() # Add handlers application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("status", status_command)) application.add_handler(CommandHandler("debug", debug_command)) application.add_handler(MessageHandler(filters.CONTACT, handle_contact)) # Add error handler application.add_error_handler(error_handler) # Run the bot until the user presses Ctrl-C logger.info("Bot started successfully") application.run_polling() if __name__ == "__main__": main()