152 lines
4.5 KiB
Python
152 lines
4.5 KiB
Python
import logging
|
|
import random
|
|
from telegram import Update
|
|
from telegram.ext import (
|
|
Application,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
CommandHandler,
|
|
)
|
|
import sqlite3
|
|
|
|
from config import Config
|
|
from database import init_db, save_message, get_recent_messages, cleanup_old_messages, DB_PATH
|
|
from markov import build_markov_chain, generate_markov_sentence
|
|
|
|
|
|
def is_admin_or_owner(update: Update) -> tuple[bool, str]:
|
|
if update.message.chat.type not in ['group', 'supergroup']:
|
|
return True, ''
|
|
|
|
chat_id = update.message.chat_id
|
|
user_id = update.message.from_user.id
|
|
bot_id = update.bot.id
|
|
|
|
try:
|
|
chat_member = update.message.chat.get_member(bot_id)
|
|
status = chat_member.status
|
|
permissions = chat_member.permissions
|
|
|
|
if status not in ['administrator', 'creator']:
|
|
return False, 'Ich benötige Admin-Rechte, um zu funktionieren.'
|
|
|
|
if not permissions.can_send_messages:
|
|
return False, 'Ich benötige die Berechtigung, Nachrichten zu senden.'
|
|
|
|
return True, ''
|
|
except Exception as e:
|
|
return False, f'Fehler beim Prüfen der Admin-Rechte: {str(e)}'
|
|
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
is_admin, message = is_admin_or_owner(update)
|
|
|
|
if not is_admin:
|
|
if message:
|
|
await update.message.reply_text(message)
|
|
return
|
|
|
|
await update.message.reply_text(
|
|
'Hallo! Ich bin ein Markov-Bot. '
|
|
'Sage mir etwas und ich kann später etwas Ähnliches generieren.'
|
|
)
|
|
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
message = update.message
|
|
chat_id = message.chat_id
|
|
user_id = message.from_user.id
|
|
username = message.from_user.username
|
|
text = message.text or ''
|
|
|
|
if not text:
|
|
return
|
|
|
|
save_message(chat_id, user_id, username, text)
|
|
|
|
cleanup_old_messages(chat_id, Config.MAX_MESSAGES)
|
|
|
|
|
|
async def generate_and_send(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.message.chat_id
|
|
|
|
recent_texts = get_recent_messages(chat_id, Config.MAX_MESSAGES)
|
|
|
|
if not recent_texts:
|
|
await update.message.reply_text('Noch keine Nachrichten zum Lernen.')
|
|
return
|
|
|
|
chain = build_markov_chain(recent_texts, order=2)
|
|
|
|
if not chain:
|
|
await update.message.reply_text('Nicht genug Daten für Markov-Kette.')
|
|
return
|
|
|
|
sentence = generate_markov_sentence(chain)
|
|
|
|
if sentence:
|
|
await update.message.reply_text(sentence)
|
|
else:
|
|
await update.message.reply_text('Konnte keinen Satz generieren.')
|
|
|
|
|
|
async def handle_mention(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
message = update.message
|
|
|
|
if message.reply_to_message:
|
|
await generate_and_send(update, context)
|
|
return
|
|
|
|
bot_username = context.bot.username
|
|
text = message.text or ''
|
|
|
|
if f'@{bot_username}' in text:
|
|
await generate_and_send(update, context)
|
|
|
|
|
|
async def random_response(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if random.random() < Config.RANDOM_CHANCE:
|
|
await generate_and_send(update, context)
|
|
|
|
|
|
async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
is_admin, message = is_admin_or_owner(update)
|
|
|
|
if not is_admin:
|
|
if message:
|
|
await update.message.reply_text(message)
|
|
return
|
|
|
|
chat_id = update.message.chat_id
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT COUNT(*) FROM messages WHERE chat_id = ?', (chat_id,))
|
|
count = cursor.fetchone()[0]
|
|
conn.close()
|
|
|
|
await update.message.reply_text(f'Gespeicherte Nachrichten in diesem Chat: {count}')
|
|
|
|
|
|
def main():
|
|
init_db()
|
|
|
|
if not Config.TELEGRAM_TOKEN:
|
|
raise ValueError('TELEGRAM_TOKEN nicht gesetzt in Environment')
|
|
|
|
application = Application.builder().token(Config.TELEGRAM_TOKEN).build()
|
|
|
|
application.add_handler(CommandHandler('start', start))
|
|
application.add_handler(CommandHandler('stats', stats))
|
|
application.add_handler(MessageHandler(filters.TEXT & filters.REPLY, handle_mention))
|
|
application.add_handler(MessageHandler(filters.TEXT & (filters.ALL & ~filters.COMMAND), handle_message))
|
|
application.add_handler(MessageHandler(filters.TEXT & (filters.ALL & ~filters.COMMAND), random_response))
|
|
|
|
logging.info('Bot startet...')
|
|
application.run_polling()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|