mirror of
https://github.com/wah-su/stickerbridge.git
synced 2025-04-04 23:34:40 +00:00
Feat/CLI: Add a preview command
This commit is contained in:
parent
0e5d019a8a
commit
469d150096
2 changed files with 129 additions and 15 deletions
|
@ -9,6 +9,7 @@ import logging
|
||||||
from nio import AsyncClient, RoomVisibility
|
from nio import AsyncClient, RoomVisibility
|
||||||
from matrix_reuploader import MatrixReuploader
|
from matrix_reuploader import MatrixReuploader
|
||||||
from telegram_exporter import TelegramExporter
|
from telegram_exporter import TelegramExporter
|
||||||
|
from matrix_preview import MatrixPreview
|
||||||
|
|
||||||
class ArgParser(argparse.ArgumentParser):
|
class ArgParser(argparse.ArgumentParser):
|
||||||
def error(self, message):
|
def error(self, message):
|
||||||
|
@ -43,6 +44,20 @@ import_cmd.add_argument('--update-pack', '-upd', action='store_true', help='Upda
|
||||||
|
|
||||||
import_cmd.epilog = 'IF boolean flags are true in "config.yaml" or "cli.yaml", and are provided here, they are applied as a False.'
|
import_cmd.epilog = 'IF boolean flags are true in "config.yaml" or "cli.yaml", and are provided here, they are applied as a False.'
|
||||||
|
|
||||||
|
preview_cmd = subparsers.add_parser('preview', help='Preview uploaded stickerpack.')
|
||||||
|
preview_cmd.add_argument('--pack-name', type=str, help='Sticker pack name. If pack_name is not provided, then preview is generated for a primary pack.', nargs="?", default="")
|
||||||
|
preview_cmd.add_argument('--room', type=str, help='Room where is stickerpack located. otherwise use room_prefix+pack_name', nargs="?", default="")
|
||||||
|
|
||||||
|
preview_cmd.add_argument('--primary', '-p', action='store_true', help='Force preview a primary pack for the room.')
|
||||||
|
preview_cmd.add_argument('--artist', '-a', type=str, help='Include stickerpack artist in the last message and room topic', nargs="?", default='False')
|
||||||
|
preview_cmd.add_argument('--artist-url', '-au', type=str, help='Add Artist page url in to the last message and room topic', nargs="?", default='False')
|
||||||
|
preview_cmd.add_argument('--space', '-s', type=str, help='Include space name in the room topic', nargs="?", default='False')
|
||||||
|
preview_cmd.add_argument('--tg-url', '-tu', type=str, help='Include stickerpack url in the last message', nargs="?", default='False')
|
||||||
|
preview_cmd.add_argument('--preview-url', '-pu', type=str, help='Include stickerpack preview url in the room topic', nargs="?", default='False')
|
||||||
|
preview_cmd.add_argument('--update-room', '-upd', action='store_true', help='Update room avatar, name and topic')
|
||||||
|
|
||||||
|
preview_cmd.epilog = 'IF flags are provided, without parameters, then parameters are taken from the pack content if were provided on import or config!\nIF boolean flags are true in "config.yaml" or "cli.yaml", and are provided here, they are applied as a False.'
|
||||||
|
|
||||||
async def main(args):
|
async def main(args):
|
||||||
os.makedirs('data', exist_ok=True)
|
os.makedirs('data', exist_ok=True)
|
||||||
if not os.path.exists(args.config):
|
if not os.path.exists(args.config):
|
||||||
|
@ -69,11 +84,15 @@ async def main(args):
|
||||||
client.restore_login(config['matrix_username'], config['matrix_deviceid'], config['matrix_token'])
|
client.restore_login(config['matrix_username'], config['matrix_deviceid'], config['matrix_token'])
|
||||||
login_response = await client.whoami()
|
login_response = await client.whoami()
|
||||||
else:
|
else:
|
||||||
raise ValueError(f'Unknown login type: "{config["matrix_login_type"]}" only "password" and "access_token" are supported')
|
await client.close()
|
||||||
|
logging.error(f'Unknown login type: "{config["matrix_login_type"]}" only "password" and "access_token" are supported')
|
||||||
|
sys.exit(1)
|
||||||
logging.info("Logged In: " + login_response.user_id)
|
logging.info("Logged In: " + login_response.user_id)
|
||||||
|
|
||||||
if sys.argv[1] == 'import':
|
if sys.argv[1] == 'import':
|
||||||
await import_stickerpack(args, client, config, cli_config)
|
await import_stickerpack(args, client, config, cli_config)
|
||||||
|
if sys.argv[1] == 'preview':
|
||||||
|
await preview_stickerpack(args, client, config, cli_config)
|
||||||
|
|
||||||
await client.close()
|
await client.close()
|
||||||
|
|
||||||
|
@ -84,10 +103,6 @@ async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, conf
|
||||||
if args.import_name is None:
|
if args.import_name is None:
|
||||||
args.__setattr__("import_name", args.pack_name)
|
args.__setattr__("import_name", args.pack_name)
|
||||||
|
|
||||||
for a in args.__dict__.keys():
|
|
||||||
if args.__getattribute__(a) is True and isinstance(args.__getattribute__(a), bool) and a not in ['primary', 'json', 'update_pack']:
|
|
||||||
args.__setattr__(a, not cli_config['import'][a])
|
|
||||||
|
|
||||||
__exporter_args = []
|
__exporter_args = []
|
||||||
if args.primary:
|
if args.primary:
|
||||||
__exporter_args.append('-p')
|
__exporter_args.append('-p')
|
||||||
|
@ -114,6 +129,8 @@ async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, conf
|
||||||
__exporter_args.append(artistUrl)
|
__exporter_args.append(artistUrl)
|
||||||
|
|
||||||
room = await create_or_get_room(args, client, config, cli_config)
|
room = await create_or_get_room(args, client, config, cli_config)
|
||||||
|
if not room:
|
||||||
|
return
|
||||||
|
|
||||||
tg_exporter = TelegramExporter(config['telegram_api_id'], config['telegram_api_hash'], config['telegram_bot_token'],
|
tg_exporter = TelegramExporter(config['telegram_api_id'], config['telegram_api_hash'], config['telegram_bot_token'],
|
||||||
'data/telegram_secrets')
|
'data/telegram_secrets')
|
||||||
|
@ -145,13 +162,14 @@ async def import_stickerpack(args: argparse.Namespace, client: AsyncClient, conf
|
||||||
}
|
}
|
||||||
text = switch.get(status, "Warning: Unknown status")
|
text = switch.get(status, "Warning: Unknown status")
|
||||||
logging.info(text)
|
logging.info(text)
|
||||||
|
|
||||||
await tg_exporter.close()
|
await tg_exporter.close()
|
||||||
|
|
||||||
|
|
||||||
async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict):
|
async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict):
|
||||||
|
|
||||||
if not cli_config['room']['homeserver']:
|
if not cli_config['room']['homeserver']:
|
||||||
raise ValueError('Please set room homeserver in cli.yaml')
|
logging.error('Please set room homeserver in cli.yaml')
|
||||||
|
return False
|
||||||
|
|
||||||
room_alias = f'{cli_config['room']['prefix']}{args.pack_name}'
|
room_alias = f'{cli_config['room']['prefix']}{args.pack_name}'
|
||||||
if args.room:
|
if args.room:
|
||||||
|
@ -159,7 +177,8 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf
|
||||||
|
|
||||||
if room_alias.startswith("#"):
|
if room_alias.startswith("#"):
|
||||||
if not ":" in room_alias:
|
if not ":" in room_alias:
|
||||||
raise ValueError(f'Invalid room: "{room_alias}". it should be "#room:example.com"')
|
logging.error(f'Invalid room: "{room_alias}". it should be "#room:example.com"')
|
||||||
|
return False
|
||||||
try:
|
try:
|
||||||
room = await client.room_resolve_alias(room_alias)
|
room = await client.room_resolve_alias(room_alias)
|
||||||
room = room.room_id
|
room = room.room_id
|
||||||
|
@ -179,25 +198,29 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf
|
||||||
room = None
|
room = None
|
||||||
|
|
||||||
if not (args.create_room or cli_config['import']['create_room']) and not room:
|
if not (args.create_room or cli_config['import']['create_room']) and not room:
|
||||||
raise ValueError(f'Room "{room_alias}" does not exist. Use --create-room to create it.')
|
logging.error(f'Room "{room_alias}" does not exist. Use --create-room to create it.')
|
||||||
|
return False
|
||||||
|
|
||||||
if room:
|
if room:
|
||||||
return room
|
return room
|
||||||
|
|
||||||
space = args.space or cli_config['room']['space']
|
space = args.space or config['preview']['space']
|
||||||
if space:
|
if space:
|
||||||
if space.startswith("#"):
|
if space.startswith("#"):
|
||||||
if not ":" in space:
|
if not ":" in space:
|
||||||
raise ValueError(f'Invalid space: "{space}". it should be "#space:example.com"')
|
logging.error(f'Invalid space: "{space}". it should be "#space:example.com"')
|
||||||
|
return False
|
||||||
try:
|
try:
|
||||||
space = await client.room_resolve_alias(space)
|
space = await client.room_resolve_alias(space)
|
||||||
space = space.room_id
|
space = space.room_id
|
||||||
except:
|
except:
|
||||||
raise ValueError(f'Space "{space}" does not exist.')
|
logging.error(f'Space "{space}" does not exist.')
|
||||||
|
return False
|
||||||
try:
|
try:
|
||||||
await client.room_get_state_event(space, 'm.room.create')
|
await client.room_get_state_event(space, 'm.room.create')
|
||||||
except:
|
except:
|
||||||
raise ValueError(f'Space "{space}" does not exist.')
|
logging.error(f'Space "{space}" does not exist.')
|
||||||
|
return False
|
||||||
|
|
||||||
room_initial_state = []
|
room_initial_state = []
|
||||||
if space:
|
if space:
|
||||||
|
@ -219,7 +242,8 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf
|
||||||
initial_state=room_initial_state,
|
initial_state=room_initial_state,
|
||||||
)
|
)
|
||||||
except:
|
except:
|
||||||
raise ValueError(f'Failed to create room "{room_alias}: {room}"')
|
logging.error(f'Failed to create room "{room_alias}: {room}"')
|
||||||
|
return False
|
||||||
|
|
||||||
RoomPowerLevels = {
|
RoomPowerLevels = {
|
||||||
"users": {
|
"users": {
|
||||||
|
@ -271,6 +295,96 @@ async def create_or_get_room(args: argparse.Namespace, client: AsyncClient, conf
|
||||||
logging.info(f'Created room {room.room_id}, #{room_alias}:{cli_config["room"]["homeserver"]}')
|
logging.info(f'Created room {room.room_id}, #{room_alias}:{cli_config["room"]["homeserver"]}')
|
||||||
return room.room_id
|
return room.room_id
|
||||||
|
|
||||||
|
|
||||||
|
async def preview_stickerpack(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict):
|
||||||
|
|
||||||
|
if args.pack_name == "" and args.room == "":
|
||||||
|
logging.error('At least one of "pack-name" or "room" must be set')
|
||||||
|
return False
|
||||||
|
|
||||||
|
__preview_args = []
|
||||||
|
__pack_name = args.pack_name
|
||||||
|
if args.primary:
|
||||||
|
__pack_name = ""
|
||||||
|
if args.update_room:
|
||||||
|
__preview_args.append('-upd')
|
||||||
|
if args.artist != 'False' or cli_config['preview']['include_artist']:
|
||||||
|
__preview_args.append('-a')
|
||||||
|
if args.artist is not None and args.artist != 'False':
|
||||||
|
__preview_args.append(args.artist)
|
||||||
|
if args.artist_url != 'False' or cli_config['preview']['include_artist_url']:
|
||||||
|
__preview_args.append('-au')
|
||||||
|
if args.artist_url is not None and args.artist_url != 'False':
|
||||||
|
__preview_args.append(args.artist_url)
|
||||||
|
if args.space != 'False' or cli_config['preview']['not_include_space']:
|
||||||
|
__preview_args.append('-s')
|
||||||
|
if args.space is not None:
|
||||||
|
__preview_args.append(args.space)
|
||||||
|
if args.tg_url != 'False' or cli_config['preview']['include_tg_url']:
|
||||||
|
__preview_args.append('-tu')
|
||||||
|
if args.tg_url is not None and args.tg_url != 'False':
|
||||||
|
__preview_args.append(args.tg_url)
|
||||||
|
if args.preview_url != 'False' or cli_config['preview']['not_include_preview_url']:
|
||||||
|
__preview_args.append('-pu')
|
||||||
|
if args.preview_url is not None:
|
||||||
|
__preview_args.append(args.preview_url)
|
||||||
|
|
||||||
|
room = await get_room(args, client, config, cli_config)
|
||||||
|
if not room:
|
||||||
|
return False
|
||||||
|
|
||||||
|
previewer = MatrixPreview(client, AttrDict({'room_id': room}))
|
||||||
|
async for status in previewer.generate_stickerset_preview_to_room(__pack_name, __preview_args):
|
||||||
|
switch = {
|
||||||
|
MatrixPreview.STATUS_NO_PERMISSION: (
|
||||||
|
"I do not have permissions to update this room\n"
|
||||||
|
"Please, give me mod 🙏"
|
||||||
|
),
|
||||||
|
MatrixPreview.STATUS_PACK_NOT_EXISTS: (
|
||||||
|
f"Stickerpack '{__pack_name}' does not exists.\n"
|
||||||
|
"Please create it first."
|
||||||
|
),
|
||||||
|
MatrixPreview.STATUS_UPDATING_ROOM_STATE: f"Updating room state...",
|
||||||
|
}
|
||||||
|
text = switch.get(status, "Warning: Unknown status")
|
||||||
|
logging.info(text)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_room(args: argparse.Namespace, client: AsyncClient, config: dict, cli_config: dict):
|
||||||
|
room_alias = f'{cli_config['room']['prefix']}{args.pack_name}'
|
||||||
|
if args.room != "":
|
||||||
|
room_alias = args.room
|
||||||
|
|
||||||
|
if room_alias.startswith("#"):
|
||||||
|
if not ":" in room_alias:
|
||||||
|
logging.error(f'Invalid room: "{room_alias}". it should be "#room:example.com"')
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
room = await client.room_resolve_alias(room_alias)
|
||||||
|
room = room.room_id
|
||||||
|
except:
|
||||||
|
room = None
|
||||||
|
elif room_alias.startswith("!"):
|
||||||
|
room = room_alias
|
||||||
|
try:
|
||||||
|
await client.room_get_state_event(room, 'm.room.create')
|
||||||
|
except:
|
||||||
|
room = None
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
room = await client.room_resolve_alias(f"#{room_alias}:{cli_config['room']['homeserver']}")
|
||||||
|
room = room.room_id
|
||||||
|
except:
|
||||||
|
room = None
|
||||||
|
|
||||||
|
if not room:
|
||||||
|
logging.error(f'Room "{room_alias}" does not exist.')
|
||||||
|
return False
|
||||||
|
|
||||||
|
if room:
|
||||||
|
return room
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
if len(sys.argv)==1:
|
if len(sys.argv)==1:
|
|
@ -65,7 +65,7 @@ async def _parse_args(args: list) -> dict[str, str]:
|
||||||
if arg in ["-j", "--json"]:
|
if arg in ["-j", "--json"]:
|
||||||
parsed_args["json"] = not parsed_args["json"]
|
parsed_args["json"] = not parsed_args["json"]
|
||||||
if arg in ["-upd", "--update-pack"]:
|
if arg in ["-upd", "--update-pack"]:
|
||||||
parsed_args["json"] = not parsed_args["json"]
|
parsed_args["update_pack"] = not parsed_args["update_pack"]
|
||||||
|
|
||||||
return parsed_args
|
return parsed_args
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue