Code erstellt von OpenCode.ai

This commit is contained in:
ki
2026-03-15 09:12:21 +01:00
parent f3024c2594
commit f0c6f70b52
11 changed files with 419 additions and 0 deletions

151
bot.py Normal file
View File

@@ -0,0 +1,151 @@
import logging
import random
from telegram import Update
from telegram.ext import (
Application,
ContextTypes,
MessageHandler,
filters,
CommandHandler,
)
import sqlite3
from config import Config
from database import init_db, save_message, get_recent_messages, cleanup_old_messages, DB_PATH
from markov import build_markov_chain, generate_markov_sentence
def is_admin_or_owner(update: Update) -> tuple[bool, str]:
if update.message.chat.type not in ['group', 'supergroup']:
return True, ''
chat_id = update.message.chat_id
user_id = update.message.from_user.id
bot_id = update.bot.id
try:
chat_member = update.message.chat.get_member(bot_id)
status = chat_member.status
permissions = chat_member.permissions
if status not in ['administrator', 'creator']:
return False, 'Ich benötige Admin-Rechte, um zu funktionieren.'
if not permissions.can_send_messages:
return False, 'Ich benötige die Berechtigung, Nachrichten zu senden.'
return True, ''
except Exception as e:
return False, f'Fehler beim Prüfen der Admin-Rechte: {str(e)}'
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
is_admin, message = is_admin_or_owner(update)
if not is_admin:
if message:
await update.message.reply_text(message)
return
await update.message.reply_text(
'Hallo! Ich bin ein Markov-Bot. '
'Sage mir etwas und ich kann später etwas Ähnliches generieren.'
)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
chat_id = message.chat_id
user_id = message.from_user.id
username = message.from_user.username
text = message.text or ''
if not text:
return
save_message(chat_id, user_id, username, text)
cleanup_old_messages(chat_id, Config.MAX_MESSAGES)
async def generate_and_send(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.message.chat_id
recent_texts = get_recent_messages(chat_id, Config.MAX_MESSAGES)
if not recent_texts:
await update.message.reply_text('Noch keine Nachrichten zum Lernen.')
return
chain = build_markov_chain(recent_texts, order=2)
if not chain:
await update.message.reply_text('Nicht genug Daten für Markov-Kette.')
return
sentence = generate_markov_sentence(chain)
if sentence:
await update.message.reply_text(sentence)
else:
await update.message.reply_text('Konnte keinen Satz generieren.')
async def handle_mention(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
if message.reply_to_message:
await generate_and_send(update, context)
return
bot_username = context.bot.username
text = message.text or ''
if f'@{bot_username}' in text:
await generate_and_send(update, context)
async def random_response(update: Update, context: ContextTypes.DEFAULT_TYPE):
if random.random() < Config.RANDOM_CHANCE:
await generate_and_send(update, context)
async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
is_admin, message = is_admin_or_owner(update)
if not is_admin:
if message:
await update.message.reply_text(message)
return
chat_id = update.message.chat_id
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM messages WHERE chat_id = ?', (chat_id,))
count = cursor.fetchone()[0]
conn.close()
await update.message.reply_text(f'Gespeicherte Nachrichten in diesem Chat: {count}')
def main():
init_db()
if not Config.TELEGRAM_TOKEN:
raise ValueError('TELEGRAM_TOKEN nicht gesetzt in Environment')
application = Application.builder().token(Config.TELEGRAM_TOKEN).build()
application.add_handler(CommandHandler('start', start))
application.add_handler(CommandHandler('stats', stats))
application.add_handler(MessageHandler(filters.TEXT & filters.REPLY, handle_mention))
application.add_handler(MessageHandler(filters.TEXT & (filters.ALL & ~filters.COMMAND), handle_message))
application.add_handler(MessageHandler(filters.TEXT & (filters.ALL & ~filters.COMMAND), random_response))
logging.info('Bot startet...')
application.run_polling()
if __name__ == '__main__':
main()