add sticker pack updating and stickers hash checking to not reupload already uploaded stickers

This commit is contained in:
Kentai Radiquum 2025-01-18 02:12:28 +05:00
parent a614c97d8e
commit 6b603ee3c7
Signed by: Radiquum
GPG key ID: 858E8EE696525EED
5 changed files with 65 additions and 24 deletions

View file

@ -136,6 +136,9 @@ class Command:
f"Stickerpack '{pack_name}' already exists.\n"
"Please delete it first."
),
MatrixReuploader.STATUS_PACK_UPDATE: (
f"Updating Stickerpack '{pack_name}'.\n"
),
MatrixReuploader.STATUS_PACK_EMPTY: (
f"Warning: Telegram pack {pack_name} find out empty or not existing."
),

View file

@ -1,4 +1,5 @@
import os
from typing import Union
import aiofiles.os
import magic
@ -83,15 +84,17 @@ async def update_room_name(client: AsyncClient, room_id: str, name: str):
async def update_room_topic(client: AsyncClient, room_id: str, topic: str):
return await client.room_put_state(room_id, 'm.room.topic', {"topic": topic})
async def upload_image(client: AsyncClient, image: str):
async def upload_image(client: AsyncClient, image: str, name: Union[str, None] = None):
mime_type = magic.from_file(image, mime=True)
file_stat = await aiofiles.os.stat(image)
if name is None:
name = os.path.basename(image)
async with aiofiles.open(image, "r+b") as f:
try:
resp, maybe_keys = await client.upload(
f,
content_type=mime_type,
filename=os.path.basename(image),
filename=name,
filesize=file_stat.st_size,
)
except:

View file

@ -2,10 +2,11 @@ import tempfile
import os
import json
import yaml
import hashlib
from nio import MatrixRoom, AsyncClient
from chat_functions import has_permission, is_stickerpack_existing, upload_image, upload_stickerpack
from chat_functions import has_permission, is_stickerpack_existing, get_stickerpack, upload_image, upload_stickerpack
from sticker_types import Sticker, MatrixStickerset, MauniumStickerset
from telegram_exporter import TelegramExporter
@ -20,7 +21,8 @@ async def _parse_args(args: list) -> dict[str, str]:
"json": config_params['import']['save_json'] or False,
"artist" : None,
"artist_url" : None,
"rating" : None
"rating" : None,
"update_pack": config_params['import']['update_pack'] or False
}
if len(args) == 0:
@ -60,6 +62,8 @@ async def _parse_args(args: list) -> dict[str, str]:
parsed_args["default"] = not parsed_args["default"]
if arg in ["-j", "--json"]:
parsed_args["json"] = not parsed_args["json"]
if arg in ["-u", "--upd"]:
parsed_args["json"] = not parsed_args["json"]
return parsed_args
@ -74,6 +78,7 @@ class MatrixReuploader:
STATUS_DOWNLOADING = 4
STATUS_UPLOADING = 5
STATUS_UPDATING_ROOM_STATE = 6
STATUS_PACK_UPDATE = 7
def __init__(self, client: AsyncClient, room: MatrixRoom, exporter: TelegramExporter = None,
pack: list[Sticker] = None):
@ -96,24 +101,56 @@ class MatrixReuploader:
parsed_args = await _parse_args(args)
stickerset = MatrixStickerset(import_name, pack_name, parsed_args["rating"], {"name": parsed_args["artist"], "url": parsed_args["artist_url"]})
json_stickerset = MauniumStickerset(import_name, pack_name, parsed_args["rating"], {"name": parsed_args["artist"], "url": parsed_args["artist_url"]}, self.room.room_id)
if await is_stickerpack_existing(self.client, self.room.room_id, stickerset.name()):
yield self.STATUS_PACK_EXISTS
return
pack_location = pack_name
if parsed_args["default"]:
pack_location = ""
exists = await is_stickerpack_existing(self.client, self.room.room_id, pack_location)
stickerpack = None;
if exists:
if parsed_args["update_pack"]:
stickerpack = await get_stickerpack(self.client, self.room.room_id, pack_location)
if parsed_args["rating"] is None:
parsed_args["rating"] = stickerpack["pack"].get("rating", None)
if parsed_args["artist"] is None and stickerpack["pack"].get("artist", None) is not None:
parsed_args["artist"] = stickerpack["pack"]["artist"].get("name", None)
if parsed_args["artist_url"] is None and stickerpack["pack"].get("url", None) is not None:
parsed_args["artist_url"] = stickerpack["pack"]["artist"].get("url", None)
yield self.STATUS_PACK_UPDATE
else:
yield self.STATUS_PACK_EXISTS
return
yield self.STATUS_DOWNLOADING
converted_stickerset = await self.exporter.get_stickerset(pack_name)
converted_stickerset = await self.exporter.get_stickerset(import_name)
yield self.STATUS_UPLOADING
stickerset = MatrixStickerset(import_name, pack_name, parsed_args["rating"], {"name": parsed_args["artist"], "url": parsed_args["artist_url"]})
json_stickerset = MauniumStickerset(import_name, pack_name, parsed_args["rating"], {"name": parsed_args["artist"], "url": parsed_args["artist_url"]}, self.room.room_id)
n = 0
for sticker in converted_stickerset:
with tempfile.NamedTemporaryFile('w+b', delete=False) as file:
file.write(sticker.image_data)
sticker_mxc = await upload_image(self.client, file.name)
hash = hashlib.md5(sticker.image_data).hexdigest()
name = f"{pack_name}__{sticker.alt_text}__{os.path.basename(file.name)}"
sticker_mxc = None
if stickerpack is not None and stickerpack.get('images', None) is not None:
for stick in stickerpack['images'].values():
if stick.get('hash', None) is not None and stick["hash"] == hash:
sticker_mxc = stick["url"]
print(f"sticker already exists, hash: {hash}")
break
if sticker_mxc is None:
sticker_mxc = await upload_image(self.client, file.name, name)
file.close()
os.unlink(file.name)
stickerset.add_sticker(sticker_mxc, sticker.alt_text)
stickerset.add_sticker(sticker_mxc, sticker.alt_text, hash)
n += 1
print(f"Uploaded: {n}/{len(converted_stickerset)}")
if parsed_args["json"]:
json_stickerset.add_sticker(sticker_mxc, sticker.alt_text, sticker.width, sticker.height, sticker.size, sticker.mimetype)
@ -123,10 +160,6 @@ class MatrixReuploader:
yield self.STATUS_UPDATING_ROOM_STATE
pack_location = pack_name
if parsed_args["default"]:
pack_location = ""
await upload_stickerpack(self.client, self.room.room_id, stickerset, pack_location)
if parsed_args["json"]:

View file

@ -22,17 +22,18 @@ class MatrixStickerset:
"images": {}
}
def add_sticker(self, mxc_uri: str, alt_text: str):
def add_sticker(self, mxc_uri: str, alt_text: str, hash=""):
if alt_text in self._content['images']:
duplicate_counter = 1
alt_text = alt_text + '-' + str(duplicate_counter)
while (alt_text in self._content['images']):
duplicate_counter += 1
alt_text = alt_text.split('-')[0] + '-' + str(duplicate_counter)
print(alt_text)
print(alt_text)
self._content['images'][alt_text] = {
"url": mxc_uri,
"usage": ["sticker"]
"usage": ["sticker"],
"hash": hash
}
def count(self):

View file

@ -74,20 +74,21 @@ class TelegramExporter:
async def get_stickerset(self, pack_name: str) -> list[Sticker]:
result: List[Sticker] = list()
short_name = pack_name
if short_name.startswith('http'):
short_name = pack_name.split("/")[-1]
try:
sticker_set = await self.client(GetStickerSetRequest(InputStickerSetShortName(short_name=short_name), hash=0))
sticker_set = await self.client(GetStickerSetRequest(InputStickerSetShortName(short_name=pack_name), hash=0))
except StickersetInvalidError:
return result # return empty on fail
downloaded_documents = []
n = 0
for document_data in sticker_set.documents:
document_data.downloaded_data_ = await self.client.download_media(document_data, file=bytes)
downloaded_documents.append(document_data)
n += 1
print(f"Downloaded: {n}/{len(sticker_set.documents)}")
print("Processing stickers...")
pool = Pool()
result = pool.map(_process_sticker, downloaded_documents)