import sqlite3 from typing import Optional DB_PATH = '/app/data/messages.db' def init_db(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, username TEXT, text TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_chat ON messages(chat_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp)') conn.commit() conn.close() def save_message(chat_id: int, user_id: int, username: Optional[str], text: str): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' INSERT INTO messages (chat_id, user_id, username, text) VALUES (?, ?, ?, ?) ''', (chat_id, user_id, username, text)) conn.commit() conn.close() def get_recent_messages(chat_id: int, limit: int = 1000) -> list[str]: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT text FROM messages WHERE chat_id = ? ORDER BY timestamp DESC LIMIT ? ''', (chat_id, limit)) messages = [row[0] for row in cursor.fetchall()] conn.close() return messages def get_all_messages_for_chat(chat_id: int) -> list[tuple[str, str]]: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT username, text FROM messages WHERE chat_id = ? ORDER BY timestamp DESC ''', (chat_id,)) messages = cursor.fetchall() conn.close() return messages def cleanup_old_messages(chat_id: int, keep_count: int = 1000): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' DELETE FROM messages WHERE id NOT IN ( SELECT id FROM messages WHERE chat_id = ? ORDER BY timestamp DESC LIMIT ? ) ''', (chat_id, keep_count)) conn.commit() conn.close()