mirror of
https://github.com/wah-su/stickerbridge.git
synced 2025-04-05 15:54:41 +00:00
add maunium stickerset creation
This commit is contained in:
parent
04e63bfcaa
commit
f4a4f0044f
5 changed files with 140 additions and 26 deletions
|
@ -75,7 +75,13 @@ class Command:
|
|||
"I am the bot that imports stickers from Telegram and upload them to Matrix rooms\n\n"
|
||||
"List of commands:\n"
|
||||
"help - Show this help message.\n"
|
||||
"import <url|pack_name> [\"import name\"] [-p | --primary] - Use this to import Telegram stickers from given link. import_name is pack_name if not provided. if -p flag is provided, pack will be uploaded as a Default Pack for this room.\n"
|
||||
"import <url|pack_name> [\"import name\"] - Use this to import a Telegram stickerpack.\n"
|
||||
"\tFlags:\n"
|
||||
"\t\t-p | --primary - Use this flag if you want to upload pack as a default pack for this room\n"
|
||||
"\t\t-j | --json - Use this flag if you want create maunium compatable json file with downloaded stickers\n"
|
||||
"\t\t-a | --artist <artist> - Use this flag if you want to include sticker pack artist to json file\n"
|
||||
"\t\t-au | --artist-url <artist_url> - Use this flag if you want to add artist url to json file\n"
|
||||
"\t\t-r | --rating <safe|questionable|explicit|s|q|e|sfw|nsfw> - Use this flag if you want add rating to json file\n"
|
||||
"preview [pack_name] - Use this to create a preview for a Telegram stickers. If pack_name is not provided, then preview is generated for a primary pack.\n"
|
||||
"\tFlags:\n"
|
||||
"\t\t-tu | --tg-url <telegram_url|telegram_shortname> - Use this flag if you want to include stickerpack url in the last message\n"
|
||||
|
@ -98,13 +104,20 @@ class Command:
|
|||
|
||||
pack_name, import_name, flags = await _parse_args(self.args)
|
||||
|
||||
isDefault = False
|
||||
if any(x in ["-p", "--primary"] for x in flags):
|
||||
isDefault = True
|
||||
# TODO?: add --help flag
|
||||
|
||||
#
|
||||
# Flags:
|
||||
# -p | --primary - Use this flag if you want to upload pack as a default pack for this room
|
||||
# -j | --json - Use this flag if you want create maunium compatable json file with downloaded stickers
|
||||
# -a | --artist <artist> - Use this flag if you want to include stickerpack artist to json file
|
||||
# -au | --artist-url <artist_url> - Use this flag if you want to add artist url to json file
|
||||
# -r | --rating <safe|questionable|explicit|s|q|e|sfw|nsfw> - Use this flag if you want add rating to json file
|
||||
#
|
||||
|
||||
reuploader = MatrixReuploader(self.client, self.room, exporter=self.tg_exporter)
|
||||
async for status in reuploader.import_stickerset_to_room(
|
||||
pack_name, import_name, isDefault
|
||||
pack_name, import_name, flags
|
||||
):
|
||||
switch = {
|
||||
MatrixReuploader.STATUS_DOWNLOADING: f"Downloading stickerpack {pack_name}...",
|
||||
|
|
|
@ -17,7 +17,8 @@ async def _parse_args(args: list) -> dict[str, str]:
|
|||
return parsed_args
|
||||
|
||||
for index, arg in enumerate(args):
|
||||
|
||||
if not arg.startswith("-"):
|
||||
continue
|
||||
if arg in ["-tu", "--tg-url", "-a", "--artist", "-au", "--artist-url", "-s", "--space", "-pu", "--preview-url"]:
|
||||
parameter = ""
|
||||
value = ""
|
||||
|
@ -52,6 +53,7 @@ async def _parse_args(args: list) -> dict[str, str]:
|
|||
parsed_args["preview_url"] = value
|
||||
if arg in ["-upd", "--update-room"]:
|
||||
parsed_args["update_room"] = True
|
||||
|
||||
return parsed_args
|
||||
|
||||
|
||||
|
|
|
@ -1,12 +1,63 @@
|
|||
import tempfile
|
||||
import os
|
||||
import json
|
||||
|
||||
from nio import MatrixRoom, AsyncClient
|
||||
|
||||
from chat_functions import has_permission, is_stickerpack_existing, send_text_to_room, upload_image, upload_stickerpack
|
||||
from sticker_types import Sticker, MatrixStickerset
|
||||
from chat_functions import has_permission, is_stickerpack_existing, upload_image, upload_stickerpack
|
||||
from sticker_types import Sticker, MatrixStickerset, MauniumStickerset
|
||||
from telegram_exporter import TelegramExporter
|
||||
|
||||
async def _parse_args(args: list) -> dict[str, str]:
|
||||
|
||||
parsed_args = {
|
||||
"default": False,
|
||||
"json": False,
|
||||
"artist" : "",
|
||||
"artist_url" : "",
|
||||
"rating" : ""
|
||||
}
|
||||
|
||||
if len(args) == 0:
|
||||
return parsed_args
|
||||
|
||||
for index, arg in enumerate(args):
|
||||
if not arg.startswith("-"):
|
||||
continue
|
||||
if arg in ["-a", "--artist", "-au", "--artist-url", "-r", "--rating"]:
|
||||
parameter = ""
|
||||
value: str = ""
|
||||
|
||||
try:
|
||||
parameter = args[index]
|
||||
value = args[index + 1]
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
if parameter in ["-r", "--rating"]:
|
||||
if value.lower() not in ["s", "safe", "sfw", "q", "questionable", 'e', 'explicit', "nsfw"]:
|
||||
continue
|
||||
if value.lower() in ["s", "safe", "sfw"]:
|
||||
value = "Safe"
|
||||
elif value.lower() in ["q", "questionable"]:
|
||||
value = "Questionable"
|
||||
elif value.lower() in ['e', 'explicit', 'nsfw']:
|
||||
value = "Explicit"
|
||||
parsed_args["rating"] = value
|
||||
elif parameter in ["-a", "--artist"]:
|
||||
parsed_args["artist"] = value
|
||||
elif parameter in ["-au", "--artist-url"]:
|
||||
if not value.startswith("http"):
|
||||
continue
|
||||
parsed_args["artist_url"] = value
|
||||
if arg in ["-p", "--primary", "-j", "--json"]:
|
||||
if arg in ["-p", "--primary"]:
|
||||
parsed_args["default"] = True
|
||||
if arg in ["-j", "--json"]:
|
||||
parsed_args["json"] = True
|
||||
|
||||
return parsed_args
|
||||
|
||||
|
||||
class MatrixReuploader:
|
||||
|
||||
|
@ -33,16 +84,15 @@ class MatrixReuploader:
|
|||
async def _has_permission_to_upload(self) -> bool:
|
||||
return await has_permission(self.client, self.room.room_id, 'state_default')
|
||||
|
||||
async def import_stickerset_to_room(self, pack_name: str, import_name: str, isDefault: bool):
|
||||
async def import_stickerset_to_room(self, pack_name: str, import_name: str, args: list[str]):
|
||||
if not await self._has_permission_to_upload():
|
||||
yield self.STATUS_NO_PERMISSION
|
||||
return
|
||||
|
||||
name = import_name
|
||||
if import_name.startswith("http"):
|
||||
name = import_name.split("/")[-1]
|
||||
parsed_args = await _parse_args(args)
|
||||
|
||||
stickerset = MatrixStickerset(name)
|
||||
stickerset = MatrixStickerset(import_name)
|
||||
json_stickerset = MauniumStickerset(import_name, pack_name, parsed_args["rating"], {"name": parsed_args["artist"], "url": parsed_args["artist_url"]}, self.room.room_id)
|
||||
if await is_stickerpack_existing(self.client, self.room.room_id, stickerset.name()):
|
||||
yield self.STATUS_PACK_EXISTS
|
||||
return
|
||||
|
@ -59,6 +109,8 @@ class MatrixReuploader:
|
|||
os.unlink(file.name)
|
||||
|
||||
stickerset.add_sticker(sticker_mxc, sticker.alt_text)
|
||||
if parsed_args["json"]:
|
||||
json_stickerset.add_sticker(sticker_mxc, sticker.alt_text, sticker.width, sticker.height, sticker.size, sticker.mimetype)
|
||||
|
||||
if not stickerset.count():
|
||||
yield self.STATUS_PACK_EMPTY
|
||||
|
@ -66,12 +118,16 @@ class MatrixReuploader:
|
|||
|
||||
yield self.STATUS_UPDATING_ROOM_STATE
|
||||
|
||||
pack_location = import_name
|
||||
if isDefault:
|
||||
pack_location = pack_name
|
||||
if parsed_args["default"]:
|
||||
pack_location = ""
|
||||
elif pack_location.startswith("http"):
|
||||
pack_location = pack_location.split("/")[-1]
|
||||
|
||||
await upload_stickerpack(self.client, self.room.room_id, stickerset, pack_location)
|
||||
|
||||
if parsed_args["json"]:
|
||||
if not os.path.exists(f"{os.getcwd()}/data/stickersets/"):
|
||||
os.mkdir(f"{os.getcwd()}/data/stickersets/")
|
||||
with open(f"{os.getcwd()}/data/stickersets/" + json_stickerset.id + ".json", "w", encoding="utf-8") as f:
|
||||
f.write(json.dumps(json_stickerset.json()))
|
||||
|
||||
yield self.STATUS_OK
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
class Sticker:
|
||||
"""Custom type for easier transfering sticker data between functions and classes with simple lists and returns"""
|
||||
def __init__(self, image_data, alt_text: str):
|
||||
def __init__(self, image_data, alt_text: str, width: int, height: int, size: int, mimetype: str):
|
||||
self.image_data = image_data
|
||||
self.alt_text = alt_text
|
||||
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.mimetype = mimetype
|
||||
self.size = size
|
||||
|
||||
|
||||
class MatrixStickerset:
|
||||
def __init__(self, pack_name: str):
|
||||
|
@ -35,3 +40,39 @@ class MatrixStickerset:
|
|||
|
||||
def json(self):
|
||||
return self._content
|
||||
|
||||
|
||||
class MauniumStickerset:
|
||||
def __init__(self, title: str, id: str, rating: str, author: str, room_id: str):
|
||||
self.title = title
|
||||
self.id = id
|
||||
self.rating = rating
|
||||
self.author = author
|
||||
self.room_id = room_id
|
||||
self.stickers = []
|
||||
|
||||
def add_sticker(self, mxc_uri: str, alt_text: str, width: int, height: int, size: int, mimetype: str):
|
||||
self.stickers.append(
|
||||
{
|
||||
"body": alt_text,
|
||||
"info": {
|
||||
"h": height,
|
||||
"w": width,
|
||||
"size": size,
|
||||
"mimetype": mimetype,
|
||||
},
|
||||
"msgtype": "m.sticker",
|
||||
"url": mxc_uri,
|
||||
"id": mxc_uri.split("/")[-1]
|
||||
}
|
||||
)
|
||||
|
||||
def json(self):
|
||||
return {
|
||||
"title": self.title,
|
||||
"id": self.id,
|
||||
"rating": self.rating,
|
||||
"author": self.author,
|
||||
"room_id": self.room_id,
|
||||
"stickers": self.stickers
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ from PIL import Image
|
|||
from sticker_types import Sticker
|
||||
|
||||
|
||||
def _convert_image(data: bytes) -> (bytes, int, int):
|
||||
def _convert_image(data: bytes):
|
||||
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
|
||||
new_file = BytesIO()
|
||||
image.save(new_file, "png")
|
||||
|
@ -26,7 +26,7 @@ def _convert_image(data: bytes) -> (bytes, int, int):
|
|||
else:
|
||||
w = int(w / (h / 256))
|
||||
h = 256
|
||||
return new_file.getvalue(), w, h
|
||||
return new_file.getvalue(), w, h, "image/png"
|
||||
|
||||
|
||||
def _convert_animation(data: bytes, width=256, height=0):
|
||||
|
@ -45,16 +45,18 @@ def _convert_animation(data: bytes, width=256, height=0):
|
|||
|
||||
out = BytesIO()
|
||||
exporter.process(an, out)
|
||||
return out.getvalue()
|
||||
return out.getvalue(), width, height, "image/webp"
|
||||
|
||||
|
||||
def _process_sticker(document) -> Sticker:
|
||||
alt = document.attributes[1].alt
|
||||
alt: str = document.attributes[1].alt
|
||||
if document.mime_type == 'image/webp':
|
||||
data, width, height = _convert_image(document.downloaded_data_)
|
||||
if document.mime_type == 'application/x-tgsticker':
|
||||
data = _convert_animation(document.downloaded_data_)
|
||||
return Sticker(data, alt)
|
||||
data, width, height, mime_type = _convert_image(document.downloaded_data_)
|
||||
elif document.mime_type == 'application/x-tgsticker':
|
||||
data, width, height, mime_type = _convert_animation(document.downloaded_data_)
|
||||
else:
|
||||
return
|
||||
return Sticker(data, alt, width, height, document.size, mime_type)
|
||||
|
||||
|
||||
class TelegramExporter:
|
||||
|
|
Loading…
Add table
Reference in a new issue