mirror of
https://github.com/wah-su/stickerbridge.git
synced 2025-04-05 15:54:41 +00:00
Initial commit
This commit is contained in:
commit
e554fc5937
18 changed files with 501 additions and 0 deletions
5
.directory
Normal file
5
.directory
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
[Dolphin]
|
||||||
|
SortOrder=1
|
||||||
|
SortRole=creationtime
|
||||||
|
Timestamp=2022,9,6,22,9,56.222
|
||||||
|
Version=4
|
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
config.yaml
|
||||||
|
data/*
|
||||||
|
telegram_secrets.session
|
3
.idea/.gitignore
generated
vendored
Normal file
3
.idea/.gitignore
generated
vendored
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
# Default ignored files
|
||||||
|
/shelf/
|
||||||
|
/workspace.xml
|
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
<component name="InspectionProjectProfileManager">
|
||||||
|
<settings>
|
||||||
|
<option name="USE_PROJECT_PROFILE" value="false" />
|
||||||
|
<version value="1.0" />
|
||||||
|
</settings>
|
||||||
|
</component>
|
4
.idea/misc.xml
generated
Normal file
4
.idea/misc.xml
generated
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (stickerbridge) (3)" project-jdk-type="Python SDK" />
|
||||||
|
</project>
|
8
.idea/modules.xml
generated
Normal file
8
.idea/modules.xml
generated
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="ProjectModuleManager">
|
||||||
|
<modules>
|
||||||
|
<module fileurl="file://$PROJECT_DIR$/.idea/stickerbridge.iml" filepath="$PROJECT_DIR$/.idea/stickerbridge.iml" />
|
||||||
|
</modules>
|
||||||
|
</component>
|
||||||
|
</project>
|
10
.idea/stickerbridge.iml
generated
Normal file
10
.idea/stickerbridge.iml
generated
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<module type="PYTHON_MODULE" version="4">
|
||||||
|
<component name="NewModuleRootManager">
|
||||||
|
<content url="file://$MODULE_DIR$">
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/venv" />
|
||||||
|
</content>
|
||||||
|
<orderEntry type="jdk" jdkName="Python 3.10 (stickerbridge) (3)" jdkType="Python SDK" />
|
||||||
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
|
</component>
|
||||||
|
</module>
|
6
.idea/vcs.xml
generated
Normal file
6
.idea/vcs.xml
generated
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="VcsDirectoryMappings">
|
||||||
|
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||||
|
</component>
|
||||||
|
</project>
|
BIN
avatar.png
Normal file
BIN
avatar.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 40 KiB |
80
bot_commands.py
Normal file
80
bot_commands.py
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
from nio import AsyncClient, MatrixRoom
|
||||||
|
|
||||||
|
from chat_functions import send_text_to_room, upload_image, upload_stickerpack, is_stickerpack_existing, has_permission
|
||||||
|
from matrix_reuploader import MatrixReuploader
|
||||||
|
from sticker_types import MatrixStickerset
|
||||||
|
from telegram_exporter import TelegramExporter
|
||||||
|
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
|
||||||
|
class Command:
|
||||||
|
def __init__(self, client: AsyncClient, room: MatrixRoom, command: str, tg_exporter: TelegramExporter):
|
||||||
|
self.client = client
|
||||||
|
self.room = room
|
||||||
|
self.command = command.lower()
|
||||||
|
self.tg_exporter = tg_exporter
|
||||||
|
self.args = self.command.split()[1:]
|
||||||
|
|
||||||
|
async def process(self):
|
||||||
|
if self.command.startswith("help"):
|
||||||
|
await self._show_help()
|
||||||
|
elif self.command.startswith("import"):
|
||||||
|
await self._import_stickerpack()
|
||||||
|
else:
|
||||||
|
await self._unknown_command()
|
||||||
|
|
||||||
|
async def _show_help(self):
|
||||||
|
text = (
|
||||||
|
"I am the bot that imports stickers from Telegram and upload them to Matrix rooms\n\n"
|
||||||
|
"List of commands:\n"
|
||||||
|
"help - Show this help message.\n"
|
||||||
|
"import <pack_name> - Use this to import Telegram stickers from given link"
|
||||||
|
)
|
||||||
|
await send_text_to_room(self.client, self.room.room_id, text)
|
||||||
|
|
||||||
|
async def _import_stickerpack(self):
|
||||||
|
|
||||||
|
if not self.args:
|
||||||
|
text = (
|
||||||
|
"You need to enter stickerpack name.\n"
|
||||||
|
"Type command 'help' for more information."
|
||||||
|
)
|
||||||
|
await send_text_to_room(self.client, self.room.room_id, text)
|
||||||
|
return
|
||||||
|
|
||||||
|
pack_name = self.args[0]
|
||||||
|
reuploader = MatrixReuploader(self.client, self.room, exporter=self.tg_exporter)
|
||||||
|
async for status in reuploader.import_stickerset_to_room(pack_name):
|
||||||
|
if status == MatrixReuploader.STATUS_DOWNLOADING:
|
||||||
|
text = f'Downloading stickerpack {pack_name}...'
|
||||||
|
if status == MatrixReuploader.STATUS_UPLOADING:
|
||||||
|
text = f'Uploading stickerpack {pack_name}...'
|
||||||
|
if status == MatrixReuploader.STATUS_UPDATING_ROOM_STATE:
|
||||||
|
text = f'Updating room state...️'
|
||||||
|
await send_text_to_room(self.client, self.room.room_id, text)
|
||||||
|
|
||||||
|
if reuploader.result == MatrixReuploader.RESULT_OK:
|
||||||
|
text = 'Done 😄'
|
||||||
|
if reuploader.result == MatrixReuploader.RESULT_NO_PERMISSION:
|
||||||
|
text = (
|
||||||
|
'I do not have permissions to create any stickerpack in this room\n'
|
||||||
|
'Please, give me mod 🙏'
|
||||||
|
)
|
||||||
|
if reuploader.result == MatrixReuploader.RESULT_PACK_EXISTS:
|
||||||
|
text = (
|
||||||
|
f"Stickerpack '{pack_name}' already exists.\n"
|
||||||
|
'Please delete it first.'
|
||||||
|
)
|
||||||
|
if reuploader.result == MatrixReuploader.RESULT_PACK_EMPTY:
|
||||||
|
text = (
|
||||||
|
f'Warning: Telegram pack {pack_name} find out empty or not existing.'
|
||||||
|
)
|
||||||
|
await send_text_to_room(self.client, self.room.room_id, text)
|
||||||
|
|
||||||
|
async def _unknown_command(self):
|
||||||
|
await send_text_to_room(
|
||||||
|
self.client,
|
||||||
|
self.room.room_id,
|
||||||
|
f"Unknown command '{self.command}'. Try the 'help' command for more information.",
|
||||||
|
)
|
49
callbacks.py
Normal file
49
callbacks.py
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
import logging
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
from nio import AsyncClient, MatrixRoom, RoomMessageText, InviteEvent, InviteMemberEvent
|
||||||
|
|
||||||
|
from bot_commands import Command
|
||||||
|
from chat_functions import send_text_to_room
|
||||||
|
from telegram_exporter import TelegramExporter
|
||||||
|
|
||||||
|
|
||||||
|
class Callbacks:
|
||||||
|
def __init__(self, client: AsyncClient, command_prefix: str, config: dict, tg_exporter: TelegramExporter):
|
||||||
|
self.client = client
|
||||||
|
self.command_prefix = command_prefix
|
||||||
|
self.config = config
|
||||||
|
self.tg_exporter = tg_exporter
|
||||||
|
|
||||||
|
async def sync(self, response):
|
||||||
|
with open('data/next_batch', 'w') as next_batch_token:
|
||||||
|
next_batch_token.write(response.next_batch)
|
||||||
|
|
||||||
|
async def message(self, room: MatrixRoom, event: RoomMessageText) -> None:
|
||||||
|
|
||||||
|
# Ignore messages from ourselves
|
||||||
|
if event.sender == self.client.user:
|
||||||
|
return
|
||||||
|
|
||||||
|
if event.body.startswith(self.command_prefix) or room.member_count <= 2:
|
||||||
|
command_string = event.body.replace(self.command_prefix, '').strip()
|
||||||
|
command = Command(self.client, room, command_string, self.tg_exporter)
|
||||||
|
try:
|
||||||
|
await command.process()
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(traceback.format_exc())
|
||||||
|
await send_text_to_room(self.client, room.room_id, 'Sorry, there was an internal error:\n' + str(e))
|
||||||
|
|
||||||
|
async def autojoin_room(self, room: MatrixRoom, event: InviteMemberEvent):
|
||||||
|
|
||||||
|
# Only react to invites for us
|
||||||
|
if not event.state_key == self.client.user_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.client.join(room.room_id)
|
||||||
|
text = (
|
||||||
|
f"Hi, I'm a {self.config['matrix_bot_name']}.\n"
|
||||||
|
"Type '!sb help' to display available commands.\n\n"
|
||||||
|
"Please do note this bot would not work in encrypted rooms."
|
||||||
|
)
|
||||||
|
await send_text_to_room(self.client, room.room_id, text)
|
75
chat_functions.py
Normal file
75
chat_functions.py
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
import aiofiles.os
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from nio import AsyncClient, UploadResponse, ErrorResponse, RoomGetStateEventError
|
||||||
|
|
||||||
|
from sticker_types import MatrixStickerset
|
||||||
|
|
||||||
|
|
||||||
|
async def send_text_to_room(client: AsyncClient, room_id: str, message: str):
|
||||||
|
content = {
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": message,
|
||||||
|
}
|
||||||
|
return await client.room_send(
|
||||||
|
room_id,
|
||||||
|
"m.room.message",
|
||||||
|
content,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def has_permission(client: AsyncClient, room_id: str, permission_type: str):
|
||||||
|
"""Reimplementation of AsyncClient.has_permission because matrix-nio version always gives an error
|
||||||
|
https://github.com/poljar/matrix-nio/issues/324"""
|
||||||
|
user_id = client.user
|
||||||
|
power_levels = await client.room_get_state_event(room_id, "m.room.power_levels")
|
||||||
|
try:
|
||||||
|
user_power_level = power_levels.content['users'][user_id]
|
||||||
|
except KeyError:
|
||||||
|
try:
|
||||||
|
user_power_level = power_levels.content['users_default']
|
||||||
|
except KeyError:
|
||||||
|
return ErrorResponse("Couldn't get user power levels")
|
||||||
|
|
||||||
|
try:
|
||||||
|
permission_power_level = power_levels.content[permission_type]
|
||||||
|
except KeyError:
|
||||||
|
return ErrorResponse(f"permission_type {permission_type} unknown")
|
||||||
|
|
||||||
|
return user_power_level >= permission_power_level
|
||||||
|
|
||||||
|
|
||||||
|
async def is_stickerpack_existing(client: AsyncClient, room_id: str, pack_name: str):
|
||||||
|
response = (await client.room_get_state_event(room_id, 'im.ponies.room_emotes', pack_name))
|
||||||
|
if isinstance(response, RoomGetStateEventError) and response.status_code == 'M_NOT_FOUND':
|
||||||
|
return False
|
||||||
|
return not response.content == {}
|
||||||
|
|
||||||
|
|
||||||
|
async def upload_stickerpack(client: AsyncClient, room_id: str, stickerset: MatrixStickerset):
|
||||||
|
return await client.room_put_state(room_id, 'im.ponies.room_emotes', stickerset.json(), state_key=stickerset.name())
|
||||||
|
|
||||||
|
|
||||||
|
async def upload_image(client: AsyncClient, image: str, mime_type: str):
|
||||||
|
file_stat = await aiofiles.os.stat(image)
|
||||||
|
async with aiofiles.open(image, "r+b") as f:
|
||||||
|
resp, maybe_keys = await client.upload(
|
||||||
|
f,
|
||||||
|
content_type=mime_type,
|
||||||
|
filename=os.path.basename(image),
|
||||||
|
filesize=file_stat.st_size,
|
||||||
|
)
|
||||||
|
if isinstance(resp, UploadResponse):
|
||||||
|
logging.debug(f"Image {image} was uploaded successfully to server.")
|
||||||
|
return resp.content_uri
|
||||||
|
else:
|
||||||
|
logging.error(f"Failed to upload image ({image}). Failure response: {resp}")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
async def upload_avatar(client: AsyncClient, image: str):
|
||||||
|
avatar_mxc = await upload_image(client, image)
|
||||||
|
if avatar_mxc:
|
||||||
|
await client.set_avatar(avatar_mxc)
|
13
config.yaml.example
Normal file
13
config.yaml.example
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
command_prefix: "!sb"
|
||||||
|
|
||||||
|
matrix_homeserver: "https://matrix.org"
|
||||||
|
matrix_username: "@username:example.com"
|
||||||
|
matrix_password: "password"
|
||||||
|
|
||||||
|
matrix_bot_name: "Telegram stickers bot"
|
||||||
|
|
||||||
|
telegram_api_id: 1234567
|
||||||
|
telegram_api_hash: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||||
|
telegram_bot_token: "1234567890:aaaaaaaaaaaaaaaaaaaaaa--aaaaaaaaaaa"
|
||||||
|
|
||||||
|
log_level: INFO
|
52
main.py
Normal file
52
main.py
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from nio import AsyncClient, SyncResponse, RoomMessageText, InviteEvent, InviteMemberEvent
|
||||||
|
|
||||||
|
from callbacks import Callbacks
|
||||||
|
from chat_functions import upload_avatar
|
||||||
|
from telegram_exporter import TelegramExporter
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
if not os.path.exists('config.yaml'):
|
||||||
|
shutil.copy('config.yaml.example', 'config.yaml')
|
||||||
|
logging.warning('Please fill in config.yaml file, then restart the bot')
|
||||||
|
return
|
||||||
|
with open("config.yaml", 'r') as config_file:
|
||||||
|
config = yaml.safe_load(config_file)
|
||||||
|
|
||||||
|
logging.basicConfig(level=os.environ.get("LOGLEVEL", config['log_level']))
|
||||||
|
|
||||||
|
client = AsyncClient(config['matrix_homeserver'], config['matrix_username'])
|
||||||
|
client.device_id = config['matrix_bot_name']
|
||||||
|
|
||||||
|
tg_exporter = TelegramExporter(config['telegram_api_id'], config['telegram_api_hash'], config['telegram_bot_token'],
|
||||||
|
'data/telegram_secrets')
|
||||||
|
await tg_exporter.connect()
|
||||||
|
|
||||||
|
callbacks = Callbacks(client, config['command_prefix'], config, tg_exporter)
|
||||||
|
client.add_response_callback(callbacks.sync, SyncResponse)
|
||||||
|
client.add_event_callback(callbacks.message, RoomMessageText)
|
||||||
|
client.add_event_callback(callbacks.autojoin_room, InviteMemberEvent)
|
||||||
|
|
||||||
|
login_response = await client.login(config['matrix_password'])
|
||||||
|
logging.info(login_response)
|
||||||
|
|
||||||
|
if os.path.exists('data/next_batch'):
|
||||||
|
with open("data/next_batch", "r") as next_batch_token:
|
||||||
|
client.next_batch = next_batch_token.read()
|
||||||
|
else:
|
||||||
|
await upload_avatar(client, 'avatar.png')
|
||||||
|
await client.set_displayname(config['matrix_bot_name'])
|
||||||
|
|
||||||
|
await client.sync_forever(30000)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
asyncio.run(main())
|
63
matrix_reuploader.py
Normal file
63
matrix_reuploader.py
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from nio import MatrixRoom, AsyncClient
|
||||||
|
|
||||||
|
from chat_functions import has_permission, is_stickerpack_existing, send_text_to_room, upload_image, upload_stickerpack
|
||||||
|
from sticker_types import Sticker, MatrixStickerset
|
||||||
|
from telegram_exporter import TelegramExporter
|
||||||
|
|
||||||
|
|
||||||
|
class MatrixReuploader:
|
||||||
|
|
||||||
|
RESULT_OK = 0
|
||||||
|
RESULT_NO_PERMISSION = 1
|
||||||
|
RESULT_PACK_EXISTS = 2
|
||||||
|
RESULT_PACK_EMPTY = 3
|
||||||
|
|
||||||
|
STATUS_DOWNLOADING = 1
|
||||||
|
STATUS_UPLOADING = 2
|
||||||
|
STATUS_UPDATING_ROOM_STATE = 3
|
||||||
|
|
||||||
|
def __init__(self, client: AsyncClient, room: MatrixRoom, exporter: TelegramExporter = None,
|
||||||
|
pack: list[Sticker] = None):
|
||||||
|
|
||||||
|
if not exporter and not pack:
|
||||||
|
raise ValueError('Either exporter or the pack must be set')
|
||||||
|
|
||||||
|
self.client = client
|
||||||
|
self.room = room
|
||||||
|
self.exporter = exporter
|
||||||
|
self.pack = pack
|
||||||
|
|
||||||
|
self.result = -1
|
||||||
|
|
||||||
|
async def _has_permission_to_upload(self) -> bool:
|
||||||
|
return await has_permission(self.client, self.room.room_id, 'state_default')
|
||||||
|
|
||||||
|
async def import_stickerset_to_room(self, pack_name: str):
|
||||||
|
if not await self._has_permission_to_upload():
|
||||||
|
self.result = self.RESULT_NO_PERMISSION
|
||||||
|
return
|
||||||
|
|
||||||
|
stickerset = MatrixStickerset(pack_name)
|
||||||
|
if await is_stickerpack_existing(self.client, self.room.room_id, stickerset.name()):
|
||||||
|
self.result = self.RESULT_PACK_EXISTS
|
||||||
|
return
|
||||||
|
|
||||||
|
yield self.STATUS_DOWNLOADING
|
||||||
|
converted_stickerset = await self.exporter.get_stickerset(stickerset.name())
|
||||||
|
yield self.STATUS_UPLOADING
|
||||||
|
for sticker in converted_stickerset:
|
||||||
|
with tempfile.NamedTemporaryFile('w+b') as file:
|
||||||
|
file.write(sticker.image_data)
|
||||||
|
sticker_mxc = await upload_image(self.client, file.name, sticker.mime_type)
|
||||||
|
stickerset.add_sticker(sticker_mxc, sticker.alt_text)
|
||||||
|
|
||||||
|
if not stickerset.count():
|
||||||
|
self.result = self.RESULT_PACK_EMPTY
|
||||||
|
return
|
||||||
|
|
||||||
|
yield self.STATUS_UPDATING_ROOM_STATE
|
||||||
|
await upload_stickerpack(self.client, self.room.room_id, stickerset)
|
||||||
|
|
||||||
|
self.result = self.RESULT_OK
|
7
requirements.txt
Normal file
7
requirements.txt
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
telethon
|
||||||
|
pillow
|
||||||
|
matrix-nio
|
||||||
|
pyyaml
|
||||||
|
aiofiles
|
||||||
|
lottie
|
||||||
|
cairosvg
|
31
sticker_types.py
Normal file
31
sticker_types.py
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
class Sticker:
|
||||||
|
"""Custom type for easier transfering sticker data between functions and classes with simple lists and returns"""
|
||||||
|
def __init__(self, image_data, alt_text: str, mime_type: str):
|
||||||
|
self.image_data = image_data
|
||||||
|
self.alt_text = alt_text
|
||||||
|
self.mime_type = mime_type
|
||||||
|
|
||||||
|
|
||||||
|
class MatrixStickerset:
|
||||||
|
def __init__(self, pack_name: str):
|
||||||
|
self._content = {
|
||||||
|
"pack": {
|
||||||
|
"display_name": pack_name
|
||||||
|
},
|
||||||
|
"images": {}
|
||||||
|
}
|
||||||
|
|
||||||
|
def add_sticker(self, mxc_uri: str, alt_text: str):
|
||||||
|
self._content['images'][alt_text] = {
|
||||||
|
"url": mxc_uri,
|
||||||
|
"usage": ["sticker"]
|
||||||
|
}
|
||||||
|
|
||||||
|
def count(self):
|
||||||
|
return len(self._content['images'])
|
||||||
|
|
||||||
|
def name(self):
|
||||||
|
return self._content['pack']['display_name']
|
||||||
|
|
||||||
|
def json(self):
|
||||||
|
return self._content
|
86
telegram_exporter.py
Normal file
86
telegram_exporter.py
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from lottie.importers import importers
|
||||||
|
from lottie.exporters import exporters
|
||||||
|
from telethon import TelegramClient
|
||||||
|
from telethon.errors import StickersetInvalidError
|
||||||
|
from telethon.tl.functions.messages import GetStickerSetRequest
|
||||||
|
from telethon.tl.types import InputStickerSetShortName
|
||||||
|
|
||||||
|
from io import BytesIO
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
from sticker_types import Sticker
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_image(data: bytes) -> (bytes, int, int):
|
||||||
|
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
|
||||||
|
new_file = BytesIO()
|
||||||
|
image.save(new_file, "webp")
|
||||||
|
w, h = image.size
|
||||||
|
if w > 256 or h > 256:
|
||||||
|
if w > h:
|
||||||
|
h = int(h / (w / 256))
|
||||||
|
w = 256
|
||||||
|
else:
|
||||||
|
w = int(w / (h / 256))
|
||||||
|
h = 256
|
||||||
|
return new_file.getvalue(), w, h
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_animation(data: bytes, width=256, height=0):
|
||||||
|
importer = importers.get_from_extension('tgs')
|
||||||
|
exporter = exporters.get('webp')
|
||||||
|
an = importer.process(BytesIO(data))
|
||||||
|
|
||||||
|
an.frame_rate = 24
|
||||||
|
|
||||||
|
if width or height:
|
||||||
|
if not width:
|
||||||
|
width = an.width * height / an.height
|
||||||
|
if not height:
|
||||||
|
height = an.height * width / an.width
|
||||||
|
an.scale(width, height)
|
||||||
|
|
||||||
|
out = BytesIO()
|
||||||
|
exporter.process(an, out)
|
||||||
|
return out.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
class TelegramExporter:
|
||||||
|
def __init__(self, api_id: int, api_hash: str, bot_token: str, secrets_filename: str):
|
||||||
|
"""Exports Telegram stickers as images.
|
||||||
|
|
||||||
|
:param api_id: Can be obtained at https://my.telegram.org/apps
|
||||||
|
:param api_hash: Can be obtained at https://my.telegram.org/apps
|
||||||
|
:param bot_token: Required to get stickers, can be obtained by talking to https://t.me/botfather
|
||||||
|
:param secrets_filename: Session name, it would be filename of stored creditials
|
||||||
|
"""
|
||||||
|
self.api_id = api_id
|
||||||
|
self.api_hash = api_hash
|
||||||
|
self.bot_token = bot_token
|
||||||
|
self.secrets_filename = secrets_filename
|
||||||
|
|
||||||
|
self.client = TelegramClient(self.secrets_filename, self.api_id, self.api_hash)
|
||||||
|
|
||||||
|
async def connect(self):
|
||||||
|
await self.client.start(bot_token=self.bot_token)
|
||||||
|
|
||||||
|
async def get_stickerset(self, pack_name: str) -> list[Sticker]:
|
||||||
|
result: List[Sticker] = list()
|
||||||
|
|
||||||
|
try:
|
||||||
|
sticker_set = await self.client(GetStickerSetRequest(InputStickerSetShortName(short_name=pack_name), hash=0))
|
||||||
|
except StickersetInvalidError:
|
||||||
|
return result
|
||||||
|
for sticker_document in sticker_set.documents:
|
||||||
|
alt = sticker_document.attributes[1].alt
|
||||||
|
raw_data = await self.client.download_media(sticker_document, file=bytes)
|
||||||
|
if sticker_document.mime_type == 'image/webp':
|
||||||
|
data, width, height = _convert_image(raw_data)
|
||||||
|
result.append(Sticker(data, alt, 'image/png'))
|
||||||
|
if sticker_document.mime_type == 'application/x-tgsticker':
|
||||||
|
data = _convert_animation(raw_data)
|
||||||
|
result.append(Sticker(data, alt, 'image/webp'))
|
||||||
|
|
||||||
|
return result
|
Loading…
Add table
Reference in a new issue