Feat: Sticker Import via CLI and bug fixes

This commit is contained in:
Kentai Radiquum 2025-01-20 01:27:05 +05:00
parent a48bc67375
commit 1d8a128902
Signed by: Radiquum
GPG key ID: 858E8EE696525EED
3 changed files with 171 additions and 43 deletions

View file

@ -82,6 +82,7 @@ class Command:
"\t\t-a | --artist <artist> - Use this flag if you want to include sticker pack artist to json file\n" "\t\t-a | --artist <artist> - Use this flag if you want to include sticker pack artist to json file\n"
"\t\t-au | --artist-url <artist_url> - Use this flag if you want to add artist url to json file\n" "\t\t-au | --artist-url <artist_url> - Use this flag if you want to add artist url to json file\n"
"\t\t-r | --rating <safe|questionable|explicit|s|q|e|sfw|nsfw> - Use this flag if you want add rating to json file\n" "\t\t-r | --rating <safe|questionable|explicit|s|q|e|sfw|nsfw> - Use this flag if you want add rating to json file\n"
"\t\t-upd | --update-room - Update pack if it already exists\n"
"\t\tIF boolean flags are true in config, and are provided, they are applied as a False.\n" "\t\tIF boolean flags are true in config, and are provided, they are applied as a False.\n"
"preview [pack_name] - Use this to create a preview for a Telegram stickers. If pack_name is not provided, then preview is generated for a primary pack.\n" "preview [pack_name] - Use this to create a preview for a Telegram stickers. If pack_name is not provided, then preview is generated for a primary pack.\n"
"\tFlags:\n" "\tFlags:\n"

View file

@ -17,8 +17,8 @@ async def _parse_args(args: list) -> dict[str, str]:
config_params = yaml.safe_load(config_file) config_params = yaml.safe_load(config_file)
parsed_args = { parsed_args = {
"default": config_params['import']['set_primary'] or False, "default": config_params['import']['primary'] or False,
"json": config_params['import']['save_json'] or False, "json": config_params['import']['json'] or False,
"artist" : None, "artist" : None,
"artist_url" : None, "artist_url" : None,
"rating" : None, "rating" : None,
@ -62,7 +62,7 @@ async def _parse_args(args: list) -> dict[str, str]:
parsed_args["default"] = not parsed_args["default"] parsed_args["default"] = not parsed_args["default"]
if arg in ["-j", "--json"]: if arg in ["-j", "--json"]:
parsed_args["json"] = not parsed_args["json"] parsed_args["json"] = not parsed_args["json"]
if arg in ["-u", "--upd"]: if arg in ["-upd", "--update-pack"]:
parsed_args["json"] = not parsed_args["json"] parsed_args["json"] = not parsed_args["json"]
return parsed_args return parsed_args
@ -122,7 +122,7 @@ class MatrixReuploader:
return return
yield self.STATUS_DOWNLOADING yield self.STATUS_DOWNLOADING
converted_stickerset = await self.exporter.get_stickerset(import_name) converted_stickerset = await self.exporter.get_stickerset(pack_name)
yield self.STATUS_UPLOADING yield self.STATUS_UPLOADING
stickerset = MatrixStickerset(import_name, pack_name, parsed_args["rating"], {"name": parsed_args["artist"], "url": parsed_args["artist_url"]}) stickerset = MatrixStickerset(import_name, pack_name, parsed_args["rating"], {"name": parsed_args["artist"], "url": parsed_args["artist_url"]})

View file

@ -7,6 +7,7 @@ import shutil
import logging import logging
from nio import AsyncClient, RoomVisibility from nio import AsyncClient, RoomVisibility
from matrix_reuploader import MatrixReuploader
from telegram_exporter import TelegramExporter from telegram_exporter import TelegramExporter
class ArgParser(argparse.ArgumentParser): class ArgParser(argparse.ArgumentParser):
@ -15,6 +16,11 @@ class ArgParser(argparse.ArgumentParser):
self.print_help() self.print_help()
sys.exit(2) sys.exit(2)
class AttrDict(dict):
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
parser = ArgParser() parser = ArgParser()
parser.add_argument('--config', '-c', help='Path to config.yaml', default='config.yaml') parser.add_argument('--config', '-c', help='Path to config.yaml', default='config.yaml')
@ -26,13 +32,14 @@ import_cmd.add_argument('pack_name', type=str, help='Sticker pack url or shortna
import_cmd.add_argument('import_name', type=str, help='Sticker pack display name', nargs='?') import_cmd.add_argument('import_name', type=str, help='Sticker pack display name', nargs='?')
import_cmd.add_argument('--primary', '-p', action='store_true', help='Upload pack as a default pack for this room') import_cmd.add_argument('--primary', '-p', action='store_true', help='Upload pack as a default pack for this room')
import_cmd.add_argument('--json', '-j', action='store_true', help='create a "maunium stickerpicker" compatible json file with downloaded stickers') import_cmd.add_argument('--json', '-j', action='store_true', help='Create a "maunium stickerpicker" compatible json file with downloaded stickers')
import_cmd.add_argument('--artist', '-a', action='store_true', help='Ask for the artist name of the sticker pack') import_cmd.add_argument('--artist', '-a', type=str, help='Ask for the artist name of the sticker pack', nargs="?", default='False')
import_cmd.add_argument('--artist-url', '-au', action='store_true', help='Ask for the artist page url') import_cmd.add_argument('--artist-url', '-au', type=str, help='Ask for the artist page url', nargs="?", default='False')
import_cmd.add_argument('--rating', '-r', action='store_true', help='Ask for rating of the sticker pack') import_cmd.add_argument('--rating', '-r', choices=('S', 'Q', 'E', 'U'), help='Set the rating of the sticker pack. Safe/Questionable/Explicit/Unrated ', default='U')
import_cmd.add_argument('--room', '-rm', type=str, help='Set a room for the sticker upload')
import_cmd.add_argument('--create-room', '-cr', action='store_true', help='Create a new room for imported stickers') import_cmd.add_argument('--create-room', '-cr', action='store_true', help='Create a new room for imported stickers')
import_cmd.add_argument('--space', '-s', type=str, help='Space room id to include the new room in. (You will need to invite the bot first!)') import_cmd.add_argument('--space', '-s', type=str, help='Space to include the new room in. (You will need to invite the bot first!)')
import_cmd.add_argument('--update-pack', '-u', action='store_true', help='Update pack if it already exists')
import_cmd.epilog = 'IF boolean flags are true in "cli.yaml", and are provided here, they are applied as a False.' import_cmd.epilog = 'IF boolean flags are true in "cli.yaml", and are provided here, they are applied as a False.'
@ -54,9 +61,6 @@ async def main(args):
client = AsyncClient(config['matrix_homeserver'], config['matrix_username']) client = AsyncClient(config['matrix_homeserver'], config['matrix_username'])
client.device_id = config['matrix_bot_name'] client.device_id = config['matrix_bot_name']
tg_exporter = TelegramExporter(config['telegram_api_id'], config['telegram_api_hash'], config['telegram_bot_token'],
'data/telegram_secrets')
await tg_exporter.connect()
if config['matrix_login_type'] == 'password': if config['matrix_login_type'] == 'password':
login_response = await client.login(config['matrix_password']) login_response = await client.login(config['matrix_password'])
@ -73,21 +77,111 @@ async def main(args):
async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict): async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict):
if args.pack_name.startswith('https://t.me/addstickers/'): if args.pack_name.startswith('https://t.me/addstickers/'):
args.pack_name = args.pack_name.split('/')[-1] args.__setattr__("pack_name", args.pack_name.split('/')[-1])
if args.import_name is None:
args.__setattr__("import_name", args.pack_name)
for a in args.__dict__.keys(): for a in args.__dict__.keys():
if args.__getattribute__(a) is True and isinstance(args.__getattribute__(a), bool): if args.__getattribute__(a) is True and isinstance(args.__getattribute__(a), bool) and a not in ['primary', 'json', 'update_pack']:
args.__setattr__(a, not cli_config['import'][a]) args.__setattr__(a, not cli_config['import'][a])
room_alias = f'{cli_config['room_prefix']}{args.pack_name}' __exporter_args = []
room = await create_or_get_room(args, client, config, cli_config, room_alias) if args.primary:
__exporter_args.append('-p')
if args.json:
__exporter_args.append('-j')
if args.update_pack:
__exporter_args.append('-upd')
if args.rating:
__exporter_args.append('-r')
__exporter_args.append(args.rating)
if args.artist != 'False' or cli_config['import']['artist']:
if args.artist is None or args.artist == 'False':
artistName = input('Artist name: ')
else:
artistName = args.artist
__exporter_args.append('-a')
__exporter_args.append(artistName)
if args.artist_url != 'False' or cli_config['import']['artist_url']:
if args.artist_url is None or args.artist_url == 'False':
artistUrl = input('Artist url: ')
else:
artistUrl = args.artist_url
__exporter_args.append('-au')
__exporter_args.append(artistUrl)
async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict, room_alias: str): room = await create_or_get_room(args, client, config, cli_config)
if not cli_config['room_homeserver']: tg_exporter = TelegramExporter(config['telegram_api_id'], config['telegram_api_hash'], config['telegram_bot_token'],
raise ValueError('Please set room_homeserver in cli.yaml') 'data/telegram_secrets')
space = args.space or cli_config['room_space'] await tg_exporter.connect()
reuploader = MatrixReuploader(client, AttrDict({'room_id': room}), exporter=tg_exporter)
async for status in reuploader.import_stickerset_to_room(
args.pack_name, args.import_name, __exporter_args
):
switch = {
MatrixReuploader.STATUS_DOWNLOADING: f"Downloading stickerpack {args.pack_name}...",
MatrixReuploader.STATUS_UPLOADING: f"Uploading stickerpack {args.pack_name}...",
MatrixReuploader.STATUS_UPDATING_ROOM_STATE: f"Updating room state...",
MatrixReuploader.STATUS_OK: "Done",
MatrixReuploader.STATUS_NO_PERMISSION: (
"I do not have permissions to create any stickerpack in this room\n"
"Please, give me mod 🙏"
),
MatrixReuploader.STATUS_PACK_EXISTS: (
f"Stickerpack '{args.pack_name}' already exists.\n"
"Please delete it first."
),
MatrixReuploader.STATUS_PACK_UPDATE: (
f"Updating Stickerpack '{args.pack_name}'.\n"
),
MatrixReuploader.STATUS_PACK_EMPTY: (
f"Warning: Telegram pack {args.pack_name} find out empty or not existing."
),
}
text = switch.get(status, "Warning: Unknown status")
logging.info(text)
async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict):
if not cli_config['room']['homeserver']:
raise ValueError('Please set room homeserver in cli.yaml')
room_alias = f'{cli_config['room']['prefix']}{args.pack_name}'
if args.room:
room_alias = args.room
if room_alias.startswith("#"):
if not ":" in room_alias:
raise ValueError(f'Invalid room: "{room_alias}". it should be "#room:example.com"')
try:
room = await client.room_resolve_alias(room_alias)
room = room.room_id
except:
room = None
elif room_alias.startswith("!"):
room = room_alias
try:
await client.room_get_state_event(room, 'm.room.create')
except:
room = None
else:
try:
room = await client.room_resolve_alias(f"#{room_alias}:{cli_config['room']['homeserver']}")
room = room.room_id
except:
room = None
if not (args.create_room or cli_config['import']['create_room']) and not room:
raise ValueError(f'Room "{room_alias}" does not exist. Use --create-room to create it.')
if room:
return room
space = args.space or cli_config['room']['space']
if space: if space:
if space.startswith("#"): if space.startswith("#"):
if not ":" in space: if not ":" in space:
@ -102,18 +196,6 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf
except: except:
raise ValueError(f'Space "{space}" does not exist.') raise ValueError(f'Space "{space}" does not exist.')
try:
room = await client.room_resolve_alias(f"#{room_alias}:{cli_config['room_homeserver']}")
room = room.room_id
except:
room = None
if not (args.create_room or cli_config['import']['create_room']) and not room:
raise ValueError(f'Room "{room_alias}" does not exist. Use --create-room to create it.')
if room:
return room
room_initial_state = [] room_initial_state = []
if space: if space:
room_initial_state = [{ room_initial_state = [{
@ -121,24 +203,69 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf
"state_key": space, "state_key": space,
"content": { "content": {
"canonical": True, "canonical": True,
"via": [cli_config['room_homeserver']] "via": [cli_config['room']['homeserver']]
} }
}] }]
room = await client.room_create( try:
visibility=RoomVisibility.public, room = await client.room_create(
alias=room_alias, visibility=RoomVisibility.public,
name=args.pack_name, alias=room_alias,
topic=f"Sticker pack: {args.pack_name}", name=args.pack_name,
initial_state=room_initial_state topic=f"Sticker pack: {args.pack_name}",
) initial_state=room_initial_state,
)
except:
raise ValueError(f'Failed to create room "{room_alias}: {room}"')
RoomPowerLevels = {
"users": {
config["matrix_username"]: 100
},
"users_default": 0,
"events": {
"m.room.name": 50,
"m.room.power_levels": 100,
"m.room.history_visibility": 100,
"m.room.canonical_alias": 50,
"m.room.avatar": 50,
"m.room.tombstone": 100,
"m.room.server_acl": 100,
"m.room.encryption": 100,
"m.space.child": 50,
"m.room.topic": 50,
"m.room.pinned_events": 50,
"m.reaction": 0,
"m.room.redaction": 50,
"org.matrix.msc3401.call": 50,
"org.matrix.msc3401.call.member": 50,
"im.vector.modular.widgets": 50
},
"events_default": 50,
"state_default": 50,
"ban": 50,
"kick": 50,
"redact": 50,
"invite": 0,
"historical": 100,
"m.call.invite": 50
}
if cli_config['room'].get('autoinvite', False):
for user in cli_config['room']['autoinvite']:
await client.room_invite(room.room_id, user['user'])
if user['power'] > 100:
user['power'] = 100
RoomPowerLevels['users'][user['user']] = user['power']
await client.room_put_state(room.room_id, "m.room.power_levels", RoomPowerLevels)
await client.room_put_state(room.room_id, "m.room.guest_access", {"guest_access": "can_join"})
if space: if space:
await client.room_put_state(space, "m.space.child", { await client.room_put_state(space, "m.space.child", {
"suggested": False, "suggested": False,
"via": [cli_config['room_homeserver']] "via": [cli_config['room']['homeserver']]
}, room.room_id) }, room.room_id)
logging.info(f'Created room {room.room_id}, #{room_alias}:{cli_config["room_homeserver"]}') logging.info(f'Created room {room.room_id}, #{room_alias}:{cli_config["room"]["homeserver"]}')
return room.room_id return room.room_id
if __name__ == '__main__': if __name__ == '__main__':