mirror of
https://github.com/wah-su/stickerbridge.git
synced 2025-04-05 15:54:41 +00:00
75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
import os
|
|
|
|
import aiofiles.os
|
|
import logging
|
|
|
|
from nio import AsyncClient, UploadResponse, ErrorResponse, RoomGetStateEventError
|
|
|
|
from sticker_types import MatrixStickerset
|
|
|
|
|
|
async def send_text_to_room(client: AsyncClient, room_id: str, message: str):
|
|
content = {
|
|
"msgtype": "m.text",
|
|
"body": message,
|
|
}
|
|
return await client.room_send(
|
|
room_id,
|
|
"m.room.message",
|
|
content,
|
|
)
|
|
|
|
|
|
async def has_permission(client: AsyncClient, room_id: str, permission_type: str):
|
|
"""Reimplementation of AsyncClient.has_permission because matrix-nio version always gives an error
|
|
https://github.com/poljar/matrix-nio/issues/324"""
|
|
user_id = client.user
|
|
power_levels = await client.room_get_state_event(room_id, "m.room.power_levels")
|
|
try:
|
|
user_power_level = power_levels.content['users'][user_id]
|
|
except KeyError:
|
|
try:
|
|
user_power_level = power_levels.content['users_default']
|
|
except KeyError:
|
|
return ErrorResponse("Couldn't get user power levels")
|
|
|
|
try:
|
|
permission_power_level = power_levels.content[permission_type]
|
|
except KeyError:
|
|
return ErrorResponse(f"permission_type {permission_type} unknown")
|
|
|
|
return user_power_level >= permission_power_level
|
|
|
|
|
|
async def is_stickerpack_existing(client: AsyncClient, room_id: str, pack_name: str):
|
|
response = (await client.room_get_state_event(room_id, 'im.ponies.room_emotes', pack_name))
|
|
if isinstance(response, RoomGetStateEventError) and response.status_code == 'M_NOT_FOUND':
|
|
return False
|
|
return not response.content == {}
|
|
|
|
|
|
async def upload_stickerpack(client: AsyncClient, room_id: str, stickerset: MatrixStickerset):
|
|
return await client.room_put_state(room_id, 'im.ponies.room_emotes', stickerset.json(), state_key=stickerset.name())
|
|
|
|
|
|
async def upload_image(client: AsyncClient, image: str, mime_type: str):
|
|
file_stat = await aiofiles.os.stat(image)
|
|
async with aiofiles.open(image, "r+b") as f:
|
|
resp, maybe_keys = await client.upload(
|
|
f,
|
|
content_type=mime_type,
|
|
filename=os.path.basename(image),
|
|
filesize=file_stat.st_size,
|
|
)
|
|
if isinstance(resp, UploadResponse):
|
|
logging.debug(f"Image {image} was uploaded successfully to server.")
|
|
return resp.content_uri
|
|
else:
|
|
logging.error(f"Failed to upload image ({image}). Failure response: {resp}")
|
|
return ""
|
|
|
|
|
|
async def upload_avatar(client: AsyncClient, image: str):
|
|
avatar_mxc = await upload_image(client, image)
|
|
if avatar_mxc:
|
|
await client.set_avatar(avatar_mxc)
|