fix: use a flag to set pack as a default for a room, instead of using import name parameter

This commit is contained in:
Kentai Radiquum 2024-12-23 19:13:03 +05:00
parent ab6da62e15
commit 4581df7208
Signed by: Radiquum
GPG key ID: 858E8EE696525EED
3 changed files with 125 additions and 11 deletions

View file

@ -2,6 +2,7 @@ from nio import AsyncClient, MatrixRoom
from chat_functions import send_text_to_room from chat_functions import send_text_to_room
from matrix_reuploader import MatrixReuploader from matrix_reuploader import MatrixReuploader
from matrix_preview import MatrixPreview
from telegram_exporter import TelegramExporter from telegram_exporter import TelegramExporter
@ -24,6 +25,8 @@ class Command:
await self._show_help() await self._show_help()
elif self.command.startswith("import"): elif self.command.startswith("import"):
await self._import_stickerpack() await self._import_stickerpack()
elif self.command.startswith("preview"):
await self._generate_preview()
else: else:
await self._unknown_command() await self._unknown_command()
@ -32,7 +35,7 @@ class Command:
"I am the bot that imports stickers from Telegram and upload them to Matrix rooms\n\n" "I am the bot that imports stickers from Telegram and upload them to Matrix rooms\n\n"
"List of commands:\n" "List of commands:\n"
"help - Show this help message.\n" "help - Show this help message.\n"
"import <url|pack_name> [import_name] - Use this to import Telegram stickers from given link. import_name is pack_name if not provided" "import <url|pack_name> [import_name] [-p | --primary] - Use this to import Telegram stickers from given link. import_name is pack_name if not provided. if -p flag is provided, pack will be uploaded as a Default Pack for this room."
) )
await send_text_to_room(self.client, self.room.room_id, text) await send_text_to_room(self.client, self.room.room_id, text)
@ -46,10 +49,18 @@ class Command:
return return
pack_name = self.args[0] pack_name = self.args[0]
import_name = self.args[1] if len(self.args) > 1 else pack_name
import_name = pack_name
if not (len(self.args) > 1 and self.args[1] in ["-p", "--primary"]):
import_name = self.args[1] if len(self.args) > 1 else pack_name
isDefault = False
if (len(self.args) > 1 and self.args[1] in ["-p", "--primary"]) or (len(self.args) > 2 and self.args[2] in ["-p", "--primary"]):
isDefault = True
reuploader = MatrixReuploader(self.client, self.room, exporter=self.tg_exporter) reuploader = MatrixReuploader(self.client, self.room, exporter=self.tg_exporter)
async for status in reuploader.import_stickerset_to_room( async for status in reuploader.import_stickerset_to_room(
pack_name, import_name pack_name, import_name, isDefault
): ):
switch = { switch = {
MatrixReuploader.STATUS_DOWNLOADING: f"Downloading stickerpack {pack_name}...", MatrixReuploader.STATUS_DOWNLOADING: f"Downloading stickerpack {pack_name}...",
@ -71,6 +82,38 @@ class Command:
text = switch.get(status, "Warning: Unknown status") text = switch.get(status, "Warning: Unknown status")
await send_text_to_room(self.client, self.room.room_id, text) await send_text_to_room(self.client, self.room.room_id, text)
async def _generate_preview(self):
await send_text_to_room(
self.client,
self.room.room_id,
f"Not implemented YET",)
return
# if not self.args:
# await send_text_to_room(
# self.client,
# self.room.room_id,
# f"You need to provide pack name. Example: !sb preview pack_name",)
# return
# pack_name = self.args[0]
# previewer = MatrixPreview(self.client, self.room)
# async for status in previewer.generate_stickerset_preview_to_room(pack_name):
# switch = {
# MatrixPreview.STATUS_OK: "Done",
# MatrixPreview.STATUS_NO_PERMISSION: (
# "I do not have permissions to update this room\n"
# "Please, give me mod 🙏"
# ),
# MatrixPreview.STATUS_PACK_NOT_EXISTS: (
# f"Stickerpack '{pack_name}' does not exists.\n"
# "Please create it first."
# ),
# MatrixPreview.STATUS_UPDATING_ROOM_STATE: f"Updating room state...",
# }
# text = switch.get(status, "Warning: Unknown status")
# await send_text_to_room(self.client, self.room.room_id, text)
async def _unknown_command(self): async def _unknown_command(self):
await send_text_to_room( await send_text_to_room(
self.client, self.client,

View file

@ -0,0 +1,69 @@
import tempfile
import os
from nio import MatrixRoom, AsyncClient
from chat_functions import has_permission, is_stickerpack_existing, send_text_to_room, upload_image, upload_stickerpack
from sticker_types import Sticker, MatrixStickerset
from telegram_exporter import TelegramExporter
class MatrixPreview:
STATUS_OK = 0
STATUS_NO_PERMISSION = 1
STATUS_PACK_NOT_EXISTS = 2
STATUS_UPDATING_ROOM_STATE = 3
def __init__(self, client: AsyncClient, room: MatrixRoom):
self.client = client
self.room = room
async def _has_permission_to_update(self) -> bool:
return await has_permission(self.client, self.room.room_id, 'state_default')
async def generate_stickerset_preview_to_room(self, pack_name: str):
if not await self._has_permission_to_update():
yield self.STATUS_NO_PERMISSION
return
name = pack_name
if pack_name.lower() == "default":
name = ""
if not await is_stickerpack_existing(self.client, self.room.room_id, name):
yield self.STATUS_PACK_NOT_EXISTS
return
stickerpack = await self.client.room_get_state_event(self.room.room_id, 'im.ponies.room_emotes', name)
# {'pack': {'display_name': 'https://t.me/addstickers/kentai_radiquum'}, 'images': {'🤗': {'url': 'mxc://wah.su/OzamJbZNgcIIDeMXofMnmkBO', 'usage': ['sticker']}, '🤗-1': {...}}}
print(stickerpack)
# yield self.STATUS_DOWNLOADING
# converted_stickerset = await self.exporter.get_stickerset(pack_name)
# yield self.STATUS_UPLOADING
# for sticker in converted_stickerset:
# with tempfile.NamedTemporaryFile('w+b', delete=False) as file:
# file.write(sticker.image_data)
# sticker_mxc = await upload_image(self.client, file.name)
# file.close()
# os.unlink(file.name)
# stickerset.add_sticker(sticker_mxc, sticker.alt_text)
# if not stickerset.count():
# yield self.STATUS_PACK_EMPTY
# return
yield self.STATUS_UPDATING_ROOM_STATE
# await upload_stickerpack(self.client, self.room.room_id, stickerset, name)
yield self.STATUS_OK

View file

@ -33,12 +33,16 @@ class MatrixReuploader:
async def _has_permission_to_upload(self) -> bool: async def _has_permission_to_upload(self) -> bool:
return await has_permission(self.client, self.room.room_id, 'state_default') return await has_permission(self.client, self.room.room_id, 'state_default')
async def import_stickerset_to_room(self, pack_name: str, import_name: str): async def import_stickerset_to_room(self, pack_name: str, import_name: str, isDefault: bool):
if not await self._has_permission_to_upload(): if not await self._has_permission_to_upload():
yield self.STATUS_NO_PERMISSION yield self.STATUS_NO_PERMISSION
return return
stickerset = MatrixStickerset(import_name) name = import_name
if import_name.startswith("http"):
name = import_name.split("/")[-1]
stickerset = MatrixStickerset(name)
if await is_stickerpack_existing(self.client, self.room.room_id, stickerset.name()): if await is_stickerpack_existing(self.client, self.room.room_id, stickerset.name()):
yield self.STATUS_PACK_EXISTS yield self.STATUS_PACK_EXISTS
return return
@ -62,12 +66,10 @@ class MatrixReuploader:
yield self.STATUS_UPDATING_ROOM_STATE yield self.STATUS_UPDATING_ROOM_STATE
name = stickerset.name() pack_location = import_name
if import_name.lower() == "default": if isDefault:
name = "" pack_location = ""
elif import_name.startswith("http"):
name = import_name.split("/")[-1]
await upload_stickerpack(self.client, self.room.room_id, stickerset, name) await upload_stickerpack(self.client, self.room.room_id, stickerset, pack_location)
yield self.STATUS_OK yield self.STATUS_OK