From 469d150096b561803ca4df3faa83ef000eceb066 Mon Sep 17 00:00:00 2001 From: Kentai Radiquum Date: Mon, 20 Jan 2025 23:12:36 +0500 Subject: [PATCH] Feat/CLI: Add a preview command --- stickerbridge/{sticker-cli.py => cli.py} | 142 ++++++++++++++++++++--- stickerbridge/matrix_reuploader.py | 2 +- 2 files changed, 129 insertions(+), 15 deletions(-) rename stickerbridge/{sticker-cli.py => cli.py} (63%) diff --git a/stickerbridge/sticker-cli.py b/stickerbridge/cli.py similarity index 63% rename from stickerbridge/sticker-cli.py rename to stickerbridge/cli.py index 8266671..ac1ff87 100644 --- a/stickerbridge/sticker-cli.py +++ b/stickerbridge/cli.py @@ -9,6 +9,7 @@ import logging from nio import AsyncClient, RoomVisibility from matrix_reuploader import MatrixReuploader from telegram_exporter import TelegramExporter +from matrix_preview import MatrixPreview class ArgParser(argparse.ArgumentParser): def error(self, message): @@ -43,6 +44,20 @@ import_cmd.add_argument('--update-pack', '-upd', action='store_true', help='Upda import_cmd.epilog = 'IF boolean flags are true in "config.yaml" or "cli.yaml", and are provided here, they are applied as a False.' +preview_cmd = subparsers.add_parser('preview', help='Preview uploaded stickerpack.') +preview_cmd.add_argument('--pack-name', type=str, help='Sticker pack name. If pack_name is not provided, then preview is generated for a primary pack.', nargs="?", default="") +preview_cmd.add_argument('--room', type=str, help='Room where is stickerpack located. otherwise use room_prefix+pack_name', nargs="?", default="") + +preview_cmd.add_argument('--primary', '-p', action='store_true', help='Force preview a primary pack for the room.') +preview_cmd.add_argument('--artist', '-a', type=str, help='Include stickerpack artist in the last message and room topic', nargs="?", default='False') +preview_cmd.add_argument('--artist-url', '-au', type=str, help='Add Artist page url in to the last message and room topic', nargs="?", default='False') +preview_cmd.add_argument('--space', '-s', type=str, help='Include space name in the room topic', nargs="?", default='False') +preview_cmd.add_argument('--tg-url', '-tu', type=str, help='Include stickerpack url in the last message', nargs="?", default='False') +preview_cmd.add_argument('--preview-url', '-pu', type=str, help='Include stickerpack preview url in the room topic', nargs="?", default='False') +preview_cmd.add_argument('--update-room', '-upd', action='store_true', help='Update room avatar, name and topic') + +preview_cmd.epilog = 'IF flags are provided, without parameters, then parameters are taken from the pack content if were provided on import or config!\nIF boolean flags are true in "config.yaml" or "cli.yaml", and are provided here, they are applied as a False.' + async def main(args): os.makedirs('data', exist_ok=True) if not os.path.exists(args.config): @@ -69,11 +84,15 @@ async def main(args): client.restore_login(config['matrix_username'], config['matrix_deviceid'], config['matrix_token']) login_response = await client.whoami() else: - raise ValueError(f'Unknown login type: "{config["matrix_login_type"]}" only "password" and "access_token" are supported') + await client.close() + logging.error(f'Unknown login type: "{config["matrix_login_type"]}" only "password" and "access_token" are supported') + sys.exit(1) logging.info("Logged In: " + login_response.user_id) if sys.argv[1] == 'import': await import_stickerpack(args, client, config, cli_config) + if sys.argv[1] == 'preview': + await preview_stickerpack(args, client, config, cli_config) await client.close() @@ -84,10 +103,6 @@ async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, conf if args.import_name is None: args.__setattr__("import_name", args.pack_name) - for a in args.__dict__.keys(): - if args.__getattribute__(a) is True and isinstance(args.__getattribute__(a), bool) and a not in ['primary', 'json', 'update_pack']: - args.__setattr__(a, not cli_config['import'][a]) - __exporter_args = [] if args.primary: __exporter_args.append('-p') @@ -114,6 +129,8 @@ async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, conf __exporter_args.append(artistUrl) room = await create_or_get_room(args, client, config, cli_config) + if not room: + return tg_exporter = TelegramExporter(config['telegram_api_id'], config['telegram_api_hash'], config['telegram_bot_token'], 'data/telegram_secrets') @@ -145,13 +162,14 @@ async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, conf } text = switch.get(status, "Warning: Unknown status") logging.info(text) - await tg_exporter.close() + async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict): if not cli_config['room']['homeserver']: - raise ValueError('Please set room homeserver in cli.yaml') + logging.error('Please set room homeserver in cli.yaml') + return False room_alias = f'{cli_config['room']['prefix']}{args.pack_name}' if args.room: @@ -159,7 +177,8 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf if room_alias.startswith("#"): if not ":" in room_alias: - raise ValueError(f'Invalid room: "{room_alias}". it should be "#room:example.com"') + logging.error(f'Invalid room: "{room_alias}". it should be "#room:example.com"') + return False try: room = await client.room_resolve_alias(room_alias) room = room.room_id @@ -179,25 +198,29 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf room = None if not (args.create_room or cli_config['import']['create_room']) and not room: - raise ValueError(f'Room "{room_alias}" does not exist. Use --create-room to create it.') + logging.error(f'Room "{room_alias}" does not exist. Use --create-room to create it.') + return False if room: return room - space = args.space or cli_config['room']['space'] + space = args.space or config['preview']['space'] if space: if space.startswith("#"): if not ":" in space: - raise ValueError(f'Invalid space: "{space}". it should be "#space:example.com"') + logging.error(f'Invalid space: "{space}". it should be "#space:example.com"') + return False try: space = await client.room_resolve_alias(space) space = space.room_id except: - raise ValueError(f'Space "{space}" does not exist.') + logging.error(f'Space "{space}" does not exist.') + return False try: await client.room_get_state_event(space, 'm.room.create') except: - raise ValueError(f'Space "{space}" does not exist.') + logging.error(f'Space "{space}" does not exist.') + return False room_initial_state = [] if space: @@ -219,7 +242,8 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf initial_state=room_initial_state, ) except: - raise ValueError(f'Failed to create room "{room_alias}: {room}"') + logging.error(f'Failed to create room "{room_alias}: {room}"') + return False RoomPowerLevels = { "users": { @@ -271,6 +295,96 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf logging.info(f'Created room {room.room_id}, #{room_alias}:{cli_config["room"]["homeserver"]}') return room.room_id + +async def preview_stickerpack(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict): + + if args.pack_name == "" and args.room == "": + logging.error('At least one of "pack-name" or "room" must be set') + return False + + __preview_args = [] + __pack_name = args.pack_name + if args.primary: + __pack_name = "" + if args.update_room: + __preview_args.append('-upd') + if args.artist != 'False' or cli_config['preview']['include_artist']: + __preview_args.append('-a') + if args.artist is not None and args.artist != 'False': + __preview_args.append(args.artist) + if args.artist_url != 'False' or cli_config['preview']['include_artist_url']: + __preview_args.append('-au') + if args.artist_url is not None and args.artist_url != 'False': + __preview_args.append(args.artist_url) + if args.space != 'False' or cli_config['preview']['not_include_space']: + __preview_args.append('-s') + if args.space is not None: + __preview_args.append(args.space) + if args.tg_url != 'False' or cli_config['preview']['include_tg_url']: + __preview_args.append('-tu') + if args.tg_url is not None and args.tg_url != 'False': + __preview_args.append(args.tg_url) + if args.preview_url != 'False' or cli_config['preview']['not_include_preview_url']: + __preview_args.append('-pu') + if args.preview_url is not None: + __preview_args.append(args.preview_url) + + room = await get_room(args, client, config, cli_config) + if not room: + return False + + previewer = MatrixPreview(client, AttrDict({'room_id': room})) + async for status in previewer.generate_stickerset_preview_to_room(__pack_name, __preview_args): + switch = { + MatrixPreview.STATUS_NO_PERMISSION: ( + "I do not have permissions to update this room\n" + "Please, give me mod 🙏" + ), + MatrixPreview.STATUS_PACK_NOT_EXISTS: ( + f"Stickerpack '{__pack_name}' does not exists.\n" + "Please create it first." + ), + MatrixPreview.STATUS_UPDATING_ROOM_STATE: f"Updating room state...", + } + text = switch.get(status, "Warning: Unknown status") + logging.info(text) + + +async def get_room(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict): + room_alias = f'{cli_config['room']['prefix']}{args.pack_name}' + if args.room != "": + room_alias = args.room + + if room_alias.startswith("#"): + if not ":" in room_alias: + logging.error(f'Invalid room: "{room_alias}". it should be "#room:example.com"') + return False + try: + room = await client.room_resolve_alias(room_alias) + room = room.room_id + except: + room = None + elif room_alias.startswith("!"): + room = room_alias + try: + await client.room_get_state_event(room, 'm.room.create') + except: + room = None + else: + try: + room = await client.room_resolve_alias(f"#{room_alias}:{cli_config['room']['homeserver']}") + room = room.room_id + except: + room = None + + if not room: + logging.error(f'Room "{room_alias}" does not exist.') + return False + + if room: + return room + + if __name__ == '__main__': args = parser.parse_args() if len(sys.argv)==1: diff --git a/stickerbridge/matrix_reuploader.py b/stickerbridge/matrix_reuploader.py index cf951fa..3eef2f1 100644 --- a/stickerbridge/matrix_reuploader.py +++ b/stickerbridge/matrix_reuploader.py @@ -65,7 +65,7 @@ async def _parse_args(args: list) -> dict[str, str]: if arg in ["-j", "--json"]: parsed_args["json"] = not parsed_args["json"] if arg in ["-upd", "--update-pack"]: - parsed_args["json"] = not parsed_args["json"] + parsed_args["update_pack"] = not parsed_args["update_pack"] return parsed_args