132 lines
No EOL
5.4 KiB
Python
132 lines
No EOL
5.4 KiB
Python
import telebot
|
||
import datetime
|
||
import sqlite3
|
||
import re
|
||
class Database:
|
||
def __init__(self,db_name) -> None:
|
||
self.db_name = db_name
|
||
self.__create_table()
|
||
def __create_table(self):
|
||
sql = self.connect_db()
|
||
sql["cursor"].execute("""
|
||
create table if not exists users (
|
||
id integer primary key autoincrement,
|
||
id_telegram integer not null unique,
|
||
username text,
|
||
first_name text,
|
||
last_name text,
|
||
date_register date,
|
||
access boolean default 1
|
||
)
|
||
""")
|
||
sql["cursor"].execute("""
|
||
create table if not exists messages (
|
||
id integer primary key autoincrement,
|
||
id_user integer not null,
|
||
message_id integer not null,
|
||
message_text text not null,
|
||
date_send date,
|
||
status boolean default 0 check(status in (0,1)),
|
||
foreign key (id_user) references users(id)
|
||
)
|
||
""")
|
||
|
||
self.close(sql['connect'],sql['cursor'])
|
||
def connect_db(self):
|
||
with sqlite3.connect(self.db_name) as connect:
|
||
cursor = connect.cursor()
|
||
return {"connect":connect,"cursor":cursor}
|
||
def check_user(self,user_id: int):
|
||
sql = self.connect_db()
|
||
sql["cursor"].execute("select * from users where id_telegram = ?", (user_id,))
|
||
info_users = sql["cursor"].fetchone()
|
||
self.close(sql['connect'],sql['cursor'])
|
||
if info_users is None:
|
||
return {"status":False,"info_users":info_users}
|
||
return {"status":True,"info_user":info_users}
|
||
def create_user(self, message: telebot.types.Message):
|
||
sql = self.connect_db()
|
||
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
||
sql["cursor"].execute("""
|
||
insert into users (id_telegram, username, first_name, last_name, date_register) values (?,?,?,?,?)
|
||
""", (message.from_user.id, message.from_user.username,message.from_user.first_name,message.from_user.last_name,date))
|
||
sql["connect"].commit()
|
||
self.close(sql['connect'],sql['cursor'])
|
||
def create_message(self,message: telebot.types.Message,info_user):
|
||
sql = self.connect_db()
|
||
date = datetime.datetime.now().strftime("%Y-%m-%d")
|
||
sql["cursor"].execute("""
|
||
insert into messages (id_user, message_id, message_text, date_send) values (?,?,?,?)
|
||
""", (info_user[0],message.message_id,message.text,date))
|
||
sql["connect"].commit()
|
||
|
||
self.close(sql['connect'],sql['cursor'])
|
||
return sql["cursor"].lastrowid
|
||
def check_application(self,id_application) -> int:
|
||
sql = self.connect_db()
|
||
sql["cursor"].execute("""
|
||
select message_id from messages where id = ?
|
||
""",(id_application,))
|
||
data_message = sql["cursor"].fetchone()
|
||
self.close(sql["connect"],sql["cursor"])
|
||
return data_message[0]
|
||
def close(self,connect,cursor):
|
||
cursor.close()
|
||
connect.close()
|
||
|
||
|
||
class TelegramBot(Database):
|
||
def __init__(self,db_name,token):
|
||
super().__init__(db_name)
|
||
self.bot = telebot.TeleBot(token)
|
||
self.admin_chat_id = -4235588549
|
||
self.router()
|
||
|
||
def router(self):
|
||
@self.bot.message_handler(commands=['start'])
|
||
def start(message: telebot.types.Message):
|
||
text = ""
|
||
if self.check_user(message.from_user.id)["status"]:
|
||
text += f"С возвращением, {message.from_user.first_name}"
|
||
else:
|
||
self.create_user(message)
|
||
text += f"Добро пожаловать, {message.from_user.first_name}"
|
||
self.bot.send_message(
|
||
message.chat.id,
|
||
text
|
||
)
|
||
@self.bot.message_handler(func=lambda message: True)
|
||
def echo_all(message: telebot.types.Message):
|
||
if message.chat.id == self.admin_chat_id:
|
||
pass
|
||
if message.chat.id == self.admin_chat_id and message.reply_to_message != None:
|
||
reply_message = message.reply_to_message.text
|
||
id_application = re.search(r"Номер заявки: (\d+)",reply_message).group(1)
|
||
id_user = re.search(r"ID пользователя: (\d+)",reply_message).group(1)
|
||
current_message = self.check_application(id_application)
|
||
print(current_message)
|
||
self.bot.send_message(
|
||
id_user,
|
||
f"Ответ от админа: {message.text}",
|
||
reply_to_message_id=current_message
|
||
)
|
||
else:
|
||
check = self.check_user(message.from_user.id)
|
||
if not check["status"]:
|
||
self.create_user(message)
|
||
check = self.check_user(message.from_user.id)
|
||
id_message = self.create_message(message,check["info_user"])
|
||
self.bot.reply_to(message,"Сообщение отправлено админу")
|
||
text = f'''
|
||
Номер заявки: {id_message}
|
||
ID пользователя: {message.from_user.id}
|
||
Сообщение: {message.text}
|
||
'''
|
||
self.bot.send_message(self.admin_chat_id, text)
|
||
|
||
self.bot.polling()
|
||
|
||
TelegramBot(
|
||
db_name="tg.db",
|
||
token="7037219892:AAGAiUsOL_Ia6lz3iZSyAN0PMlmZFDXbDL0"
|
||
) |