From f4d15c5eaf89c61ac5391a2fac7262ae57ea7180 Mon Sep 17 00:00:00 2001 From: Radiquum Date: Wed, 14 May 2025 23:49:01 +0500 Subject: [PATCH 1/3] add types and decouple packs functions from flask --- src/api/packs.py | 63 ++++++++++-------------------------------- src/shared/__init__.py | 0 src/shared/packs.py | 61 ++++++++++++++++++++++++++++++++++++++++ src/type/__init__.py | 0 src/type/file.py | 21 ++++++++++++++ src/type/mod.py | 39 ++++++++++++++++++++++++++ src/type/pack.py | 38 +++++++++++++++++++++++++ 7 files changed, 174 insertions(+), 48 deletions(-) create mode 100644 src/shared/__init__.py create mode 100644 src/shared/packs.py create mode 100644 src/type/__init__.py create mode 100644 src/type/file.py create mode 100644 src/type/mod.py create mode 100644 src/type/pack.py diff --git a/src/api/packs.py b/src/api/packs.py index cc2f5df..611d8a6 100644 --- a/src/api/packs.py +++ b/src/api/packs.py @@ -1,70 +1,37 @@ -import os from . import apiPacks from flask import request, jsonify -from config import PACKS_FOLDER -import json -import shutil +from shared.packs import getPacks, createPack, deletePack @apiPacks.route("/all", methods=["GET"]) -def getPacks(): - packs = [] - - if not os.path.exists(f"{PACKS_FOLDER}"): - os.makedirs(f"{PACKS_FOLDER}", exist_ok=True) - return jsonify(packs) - - pack_folders = [f.name for f in os.scandir(PACKS_FOLDER) if f.is_dir()] - for pack_folder in pack_folders: - if not os.path.exists(f"{PACKS_FOLDER}/{pack_folder}/packfile.json"): - continue - with open(f"{PACKS_FOLDER}/{pack_folder}/packfile.json") as fp: - pack = json.load(fp) - pack["_id"] = pack_folder - packs.append(pack) - fp.close() - return jsonify(packs) +def getPacksEndpoint(): + return jsonify(getPacks()) @apiPacks.route("/new", methods=["POST"]) -def createPack(): - pack = { - "formatVersion": 0, - "modpackVersion": 0, - "title": request.json.get("title"), - "author": request.json.get("author"), - "version": request.json.get("version"), - "modloader": request.json.get("modloader"), - "updateURL": "", - "mods": [], - } - title = pack.get("title").replace(" ", "_") +def createPackEndpoint(): + pack, is_exists = createPack( + request.json.get("title"), + request.json.get("author"), + request.json.get("version"), + request.json.get("modloader"), + ) - if os.path.exists(f"{PACKS_FOLDER}/{title}"): + if is_exists: return jsonify({"status": "error", "message": "pack already exists"}) - os.makedirs(f"{PACKS_FOLDER}/{title}", exist_ok=True) - - with open( - os.path.abspath(f"{PACKS_FOLDER}/{title}/packfile.json"), - mode="w", - encoding="utf-8", - ) as fp: - json.dump(pack, fp) - fp.close() - return jsonify( { "status": "ok", - "message": f"pack {pack.get('title')} created", - "id": title, + "message": f"pack {pack.title} created", + "id": pack._id, } ) @apiPacks.route("//delete", methods=["GET"]) -def deletePack(id): - shutil.rmtree(f"{PACKS_FOLDER}/{id}") +def deletePackEndpoint(id): + deletePack(id) return jsonify( { "status": "ok", diff --git a/src/shared/__init__.py b/src/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/shared/packs.py b/src/shared/packs.py new file mode 100644 index 0000000..385106f --- /dev/null +++ b/src/shared/packs.py @@ -0,0 +1,61 @@ +import os +from config import PACKS_FOLDER +import json +import shutil + +from type.pack import Pack + + +def getPacks() -> list[Pack]: + """ + Lists and returns all available packs from PACKS_FOLDER directory defined in config.py + """ + packs: list[Pack] = [] + + if not os.path.exists(f"{PACKS_FOLDER}"): + os.makedirs(f"{PACKS_FOLDER}", exist_ok=True) + return packs + + pack_folders = [f.name for f in os.scandir(PACKS_FOLDER) if f.is_dir()] + for pack_folder in pack_folders: + if not os.path.exists(f"{PACKS_FOLDER}/{pack_folder}/packfile.json"): + continue + with open(f"{PACKS_FOLDER}/{pack_folder}/packfile.json") as fp: + pack = json.load(fp) + pack["_id"] = pack_folder + packs.append(pack) + fp.close() + + return packs + + +def createPack( + title: str, author: str, game_version: str, mod_loader: str +) -> Pack | bool: + """ + Creates a new pack. + If pack exists returns tuple[Pack, True], if pack was created returns tuple[Pack, False] + """ + pack = Pack( + title.replace(" ", "_"), title, author, game_version, mod_loader, "", [], 0, 0 + ) + + if os.path.exists(f"{PACKS_FOLDER}/{pack._id}"): + return pack, True + + os.makedirs(f"{PACKS_FOLDER}/{pack._id}", exist_ok=True) + + with open( + os.path.abspath(f"{PACKS_FOLDER}/{pack._id}/packfile.json"), + mode="w", + encoding="utf-8", + ) as fp: + json.dump(pack.json(), fp) + fp.close() + + return pack, False + + +def deletePack(id): + shutil.rmtree(f"{PACKS_FOLDER}/{id}") + return True diff --git a/src/type/__init__.py b/src/type/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/type/file.py b/src/type/file.py new file mode 100644 index 0000000..02b9ab6 --- /dev/null +++ b/src/type/file.py @@ -0,0 +1,21 @@ +import json + + +class ModFile: + def __init__( + self, version: str, hashes: dict[str, str], url: str, filename: str, size: int + ): + self.version = version + self.hashes = hashes + self.url = url + self.filename = filename + self.size = size + + def json(self): + return { + "version": self.version, + "hashes": self.hashes, + "url": self.url, + "filename": self.filename, + "size": self.size, + } diff --git a/src/type/mod.py b/src/type/mod.py new file mode 100644 index 0000000..63ec543 --- /dev/null +++ b/src/type/mod.py @@ -0,0 +1,39 @@ +import json +from .file import ModFile + + +class Mod: + def __init__( + self, + slug: str, + project_id: str, + icon: str, + title: str, + developers: list[str], + source: str, + environment: dict[str, bool], + dependencies: list, + file: ModFile, + ): + self.slug = slug + self.project_id = project_id + self.icon = icon + self.title = title + self.developers = developers + self.source = source + self.environment = environment + self.dependencies = dependencies + self.file = file + + def json(self): + return { + "slug": self.slug, + "project_id": self.project_id, + "icon": self.icon, + "title": self.title, + "developers": self.developers, + "source": self.source, + "environment": self.environment, + "dependencies": self.dependencies, + "file": self.file, + } diff --git a/src/type/pack.py b/src/type/pack.py new file mode 100644 index 0000000..d0c1262 --- /dev/null +++ b/src/type/pack.py @@ -0,0 +1,38 @@ +import json +from .mod import Mod + + +class Pack: + def __init__( + self, + _id: str, + title: str, + author: str, + version: str, + modloader: str, + updateURL: str, + mods: list[Mod], + modpackVersion: int = 0, + formatVersion: int = 0, + ): + self._id = _id + self.title = title + self.author = author + self.version = version + self.modloader = modloader + self.updateURL = updateURL + self.mods = mods + self.modpackVersion = modpackVersion + self.formatVersion = formatVersion + + def json(self): + return { + "title": self.title, + "author": self.author, + "version": self.version, + "modloader": self.modloader, + "updateURL": self.updateURL, + "mods": self.mods, + "modpackVersion": self.modpackVersion, + "formatVersion": self.formatVersion, + } From 69fb1584e5c6ba0357946c8c40f2211a1eea36ee Mon Sep 17 00:00:00 2001 From: Radiquum Date: Thu, 15 May 2025 00:30:20 +0500 Subject: [PATCH 2/3] decouple pack functions from flask --- gui/app/pack/page.tsx | 6 +- src/api/pack.py | 135 ++++++++++++++---------------------------- src/shared/pack.py | 91 ++++++++++++++++++++++++++++ src/shared/packs.py | 2 +- 4 files changed, 141 insertions(+), 93 deletions(-) create mode 100644 src/shared/pack.py diff --git a/gui/app/pack/page.tsx b/gui/app/pack/page.tsx index 867e95d..4672a1c 100644 --- a/gui/app/pack/page.tsx +++ b/gui/app/pack/page.tsx @@ -52,8 +52,10 @@ export default function PackPage() { const res = await fetch(PACK_ENDPOINT("getPack", id)); if (!res.ok) router.push("/404"); const data: Pack = await res.json(); - setPackData(data); - setPackDataLoading(false); + if (data._id) { + setPackData(data); + setPackDataLoading(false); + } } useEffect(() => { diff --git a/src/api/pack.py b/src/api/pack.py index 54ecab2..50e3979 100644 --- a/src/api/pack.py +++ b/src/api/pack.py @@ -1,26 +1,19 @@ import os import re from . import apiPack -from flask import request, jsonify, send_file, redirect, url_for, abort +from flask import request, jsonify, send_file, redirect, url_for from config import PACKS_FOLDER, IMG_ALLOWED_MIME from PIL import Image from io import BytesIO import base64 -import json -from .source import Modrinth, CurseForge +from shared.pack import getPack, addMod, deleteMod @apiPack.route("/", methods=["GET"]) -def getPack(id): - if not os.path.exists(f"{PACKS_FOLDER}/{id}/packfile.json"): +def getPackEndpoint(id): + pack = getPack(id) + if pack is None: return jsonify({"status": "error", "message": "pack not found"}), 404 - - pack = {} - with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp: - pack = json.load(fp) - pack["_id"] = id - fp.close() - return jsonify(pack) @@ -63,73 +56,35 @@ def editPackImage(id): @apiPack.route("//mod/add", methods=["POST"]) -def addMod(id): +def addModEndpoint(id): source = request.json.get("source", None) slug = request.json.get("slug", None) version = request.json.get("version", None) - pack = {} - mod = {} + mod = addMod(id, source, slug, version) - if not os.path.exists(f"{PACKS_FOLDER}/{id}/packfile.json"): + if mod == "err_404": return jsonify({"status": "error", "message": "pack not found"}) - with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp: - pack = json.load(fp) - fp.close() - mod_loader = pack.get("modloader").lower() - game_version = pack.get("version") - - if not source: + elif mod == "err_source": return jsonify({"status": "error", "message": "mod source is required"}) - if not slug: + elif mod == "err_slug": return jsonify({"status": "error", "message": "mod slug is required"}) - - for mod in pack["mods"]: - if mod.get("slug") == slug: - return jsonify({"status": "error", "message": "mod already exists"}) - - if source == "Modrinth": - mod = Modrinth.getModrinthMod(slug, version, mod_loader, game_version) - elif source == "CurseForge": - mod = CurseForge.getCurseForgeMod(slug, version, mod_loader, game_version) - - if mod.get("status") != "ok": - return jsonify({"status": "error", "message": mod.get("message")}) - - pack["modpackVersion"] += 1 - pack["mods"].append(mod.get("mod")) - with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp: - json.dump(pack, fp) - fp.close() + elif mod == "err_exists": + return jsonify({"status": "error", "message": "mod already exists"}) + elif isinstance(mod, str): + return jsonify({"status": "error", "message": mod}) return jsonify( { "status": "ok", - "message": f"mod {mod.get("mod").get('title')} ({slug}) has been added", - "mod": mod.get("mod"), + "message": f"mod {mod.get("title")} ({slug}) has been added", + "mod": mod, } ) @apiPack.route("//mod//delete", methods=["GET"]) -def deleteMod(id, slug): - pack = {} - with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp: - pack = json.load(fp) - fp.close() - - for mod in pack.get("mods"): - if mod.get("slug") == slug: - pack["mods"].remove(mod) - pack["modpackVersion"] += 1 - if os.path.exists( - f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}" - ): - os.remove(f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}") - - with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp: - json.dump(pack, fp) - fp.close() - +def deleteModEndpoint(id, slug): + deleteMod(id, slug) return jsonify( { "status": "ok", @@ -138,34 +93,34 @@ def deleteMod(id, slug): ) -@apiPack.route("//mods/delete", methods=["POST"]) -def deleteModBulk(id): - pack = {} - slugs = request.json +# @apiPack.route("//mods/delete", methods=["POST"]) +# def deleteModBulk(id): +# pack = {} +# slugs = request.json - with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp: - pack = json.load(fp) - fp.close() +# with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp: +# pack = json.load(fp) +# fp.close() - for slug in slugs: - for mod in pack.get("mods"): - if mod.get("slug") == slug: - pack["mods"].remove(mod) - pack["modpackVersion"] += 1 - if os.path.exists( - f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}" - ): - os.remove( - f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}" - ) +# for slug in slugs: +# for mod in pack.get("mods"): +# if mod.get("slug") == slug: +# pack["mods"].remove(mod) +# pack["modpackVersion"] += 1 +# if os.path.exists( +# f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}" +# ): +# os.remove( +# f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}" +# ) - with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp: - json.dump(pack, fp) - fp.close() +# with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp: +# json.dump(pack, fp) +# fp.close() - return jsonify( - { - "status": "ok", - "message": f"mods has been removed", - } - ) +# return jsonify( +# { +# "status": "ok", +# "message": f"mods has been removed", +# } +# ) diff --git a/src/shared/pack.py b/src/shared/pack.py new file mode 100644 index 0000000..5555241 --- /dev/null +++ b/src/shared/pack.py @@ -0,0 +1,91 @@ +import os +from config import PACKS_FOLDER +import json +from type.pack import Pack +from type.mod import Mod +from typing import Literal +from api.source import Modrinth, CurseForge + + +def getPack(id: str) -> Pack | None: + if not os.path.exists(f"{PACKS_FOLDER}/{id}/packfile.json"): + return None + + pack: Pack = {} + with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp: + pack = json.load(fp) + pack["_id"] = id + fp.close() + + return pack + + +def addMod( + pack_id: str, source: str, slug: str, version: str | None +) -> ( + Mod + | Literal["err_404"] + | Literal["err_source"] + | Literal["err_slug"] + | Literal["err_exists"] + | str +): + pack: Pack = {} + mod: Mod = {} + + if not os.path.exists(f"{PACKS_FOLDER}/{pack_id}/packfile.json"): + return "err_404" + with open(f"{PACKS_FOLDER}/{pack_id}/packfile.json") as fp: + pack = json.load(fp) + fp.close() + mod_loader = pack.get("modloader").lower() + game_version = pack.get("version") + + if not source: + return "err_source" + if not slug: + return "err_slug" + + for mod in pack["mods"]: + if mod.get("slug") == slug: + return "err_exists" + + if source == "Modrinth": + mod = Modrinth.getModrinthMod(slug, version, mod_loader, game_version) + elif source == "CurseForge": + mod = CurseForge.getCurseForgeMod(slug, version, mod_loader, game_version) + + if mod.get("status") != "ok": + return mod.get("message") + + pack["modpackVersion"] += 1 + pack["mods"].append(mod.get("mod")) + with open( + f"{PACKS_FOLDER}/{pack_id}/packfile.json", mode="w", encoding="utf-8" + ) as fp: + json.dump(pack, fp) + fp.close() + + return mod.get("mod") + + +def deleteMod(id: str, slug: str) -> Literal[True]: + pack = {} + with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp: + pack = json.load(fp) + fp.close() + + for mod in pack.get("mods"): + if mod.get("slug") == slug: + pack["mods"].remove(mod) + pack["modpackVersion"] += 1 + if os.path.exists( + f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}" + ): + os.remove(f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}") + + with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp: + json.dump(pack, fp) + fp.close() + + return True diff --git a/src/shared/packs.py b/src/shared/packs.py index 385106f..d330ac1 100644 --- a/src/shared/packs.py +++ b/src/shared/packs.py @@ -31,7 +31,7 @@ def getPacks() -> list[Pack]: def createPack( title: str, author: str, game_version: str, mod_loader: str -) -> Pack | bool: +) -> tuple[Pack, bool]: """ Creates a new pack. If pack exists returns tuple[Pack, True], if pack was created returns tuple[Pack, False] From da5158db56c1940990ac663c8232821041d79aa0 Mon Sep 17 00:00:00 2001 From: Radiquum Date: Thu, 15 May 2025 19:27:35 +0500 Subject: [PATCH 3/3] refactor: download login, factor out the download function --- requirements.txt | 1 + src/api/download.py | 200 ++++++++++++++++++++++++----------------- src/shared/download.py | 46 ++++++++++ 3 files changed, 165 insertions(+), 82 deletions(-) create mode 100644 src/shared/download.py diff --git a/requirements.txt b/requirements.txt index d81b154..2f2d05c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ flaskwebgui requests Pillow flask-socketio +tqdm \ No newline at end of file diff --git a/src/api/download.py b/src/api/download.py index 2b96ac2..eb4ffd0 100644 --- a/src/api/download.py +++ b/src/api/download.py @@ -1,63 +1,15 @@ import os from . import apiDownload -from flask import request, jsonify, send_file, redirect, url_for, abort +from flask import request, jsonify from config import PACKS_FOLDER import json from flask_socketio import emit -import requests - -def download(path, url, name, total): - r = requests.get(url, stream=True) - if r.status_code != 200: - emit( - "download_current", - { - "status": "error", - "message": f"Got a HTTP ERROR {r.status_code} while downloading {name}", - }, - ) - return { - "status": "error", - "message": f"Got a HTTP ERROR {r.status_code} while downloading {name}", - } - downloaded = 0 - if os.path.exists(f"{path}/{name}"): - emit( - "download_current", - {"status": "ok", "message": f"{name} already downloaded"}, - namespace="/", - broadcast=True, - ) - return {"status": "ok", "message": f"{name} already downloaded"} - with open(f"{path}/{name}", "wb") as fp: - for data in r.iter_content(chunk_size=1024): - size = fp.write(data) - downloaded += size - emit( - "download_current", - { - "status": "pending", - "total_bytes": total, - "download_bytes": downloaded, - }, - namespace="/", - broadcast=True, - ) - emit( - "download_current", - {"status": "ok", "message": f"{name} downloaded"}, - namespace="/", - broadcast=True, - ) - return { - "status": "ok", - "message": f"{name} downloaded", - } +from shared.download import download @apiDownload.route("/pack", methods=["POST"]) -def downloadPack(): +def downloadPackEndpoint(): pack = {} pack_id = request.json.get("pack_id") @@ -66,11 +18,33 @@ def downloadPack(): fp.close() mods = pack.get("mods", []) - total = len(mods) + queue = [] + for mod in mods: + queue.append( + { + "slug": mod.get("slug"), + "title": mod.get("file").get("title"), + "url": mod.get("file").get("url"), + "filename": mod.get("file").get("filename"), + "size": mod.get("file").get("size"), + } + ) + for dep in mod.get("dependencies"): + queue.append( + { + "slug": dep.get("slug"), + "title": dep.get("file").get("title"), + "url": dep.get("file").get("url"), + "filename": dep.get("file").get("filename"), + "size": dep.get("file").get("size"), + } + ) + queue = list({mod["slug"]: mod for mod in queue}.values()) + total = len(queue) os.makedirs(f"{PACKS_FOLDER}/{pack_id}/mods", exist_ok=True) - for i, mod in enumerate(mods): + for i, mod in enumerate(queue): emit( "download_total", { @@ -78,17 +52,37 @@ def downloadPack(): "total": total, "current": i, "title": mod.get("title"), - "filename": mod.get("file").get("filename"), + "filename": mod.get("filename"), }, namespace="/", broadcast=True, ) - download( + status, message = download( f"{PACKS_FOLDER}/{pack_id}/mods", - mod.get("file").get("url"), - mod.get("file").get("filename"), - mod.get("file").get("size"), + mod.get("url"), + mod.get("filename"), + mod.get("size"), ) + if status is False: + emit( + "download_current", + { + "status": "error", + "message": message, + }, + namespace="/", + broadcast=True, + ) + else: + emit( + "download_current", + { + "status": "ok", + "message": mod.get("filename"), + }, + namespace="/", + broadcast=True, + ) emit( "download_total", @@ -111,7 +105,7 @@ def downloadPack(): @apiDownload.route("/mods", methods=["POST"]) -def downloadMods(): +def downloadModsEndpoint(): pack = {} pack_id = request.json.get("pack_id") mods_slugs = request.json.get("mods") @@ -121,31 +115,73 @@ def downloadMods(): fp.close() mods = pack.get("mods", []) - total = len(mods_slugs) + queue = [] + for slug in mods_slugs: + for mod in mods: + if mod.get("slug") == slug: + queue.append( + { + "slug": mod.get("slug"), + "title": mod.get("file").get("title"), + "url": mod.get("file").get("url"), + "filename": mod.get("file").get("filename"), + "size": mod.get("file").get("size"), + } + ) + for dep in mod.get("dependencies"): + queue.append( + { + "slug": dep.get("slug"), + "title": dep.get("file").get("title"), + "url": dep.get("file").get("url"), + "filename": dep.get("file").get("filename"), + "size": dep.get("file").get("size"), + } + ) + queue = list({mod["slug"]: mod for mod in queue}.values()) + total = len(queue) os.makedirs(f"{PACKS_FOLDER}/{pack_id}/mods", exist_ok=True) - for i, slug in enumerate(mods_slugs): - for mod in mods: - if mod.get("slug") == slug: - emit( - "download_total", - { - "status": "ok", - "total": total, - "current": i, - "title": mod.get("title"), - "filename": mod.get("file").get("filename"), - }, - namespace="/", - broadcast=True, - ) - download( - f"{PACKS_FOLDER}/{pack_id}/mods", - mod.get("file").get("url"), - mod.get("file").get("filename"), - mod.get("file").get("size"), - ) + for i, mod in enumerate(queue): + emit( + "download_total", + { + "status": "ok", + "total": total, + "current": i, + "title": mod.get("title"), + "filename": mod.get("filename"), + }, + namespace="/", + broadcast=True, + ) + status, message = download( + f"{PACKS_FOLDER}/{pack_id}/mods", + mod.get("url"), + mod.get("filename"), + mod.get("size"), + ) + if status is False: + emit( + "download_current", + { + "status": "error", + "message": message, + }, + namespace="/", + broadcast=True, + ) + else: + emit( + "download_current", + { + "status": "ok", + "message": mod.get("filename"), + }, + namespace="/", + broadcast=True, + ) emit( "download_total", @@ -162,6 +198,6 @@ def downloadMods(): return jsonify( { "status": "ok", - "message": f"download of {pack_id} with {total} mods finished", + "message": f"download of {total} mods finished", }, ) diff --git a/src/shared/download.py b/src/shared/download.py new file mode 100644 index 0000000..f4496c6 --- /dev/null +++ b/src/shared/download.py @@ -0,0 +1,46 @@ +import os +from flask_socketio import emit +import requests +from typing import Literal +from tqdm import tqdm + + +def download( + path, url, name, total +) -> tuple[Literal[True], None] | tuple[Literal[False], str]: + if os.path.exists(f"{path}/{name}"): + return True, None + r = requests.get(url, stream=True) + if r.status_code != 200: + return False, f"Got a HTTP ERROR {r.status_code} while downloading {name}" + + totalBytes = int(r.headers.get("Content-Length", total)) + if os.getenv("is_cli"): + with open(f"{path}/{name}", "wb") as fp, tqdm( + desc=name.ljust(40), + total=totalBytes, + miniters=100, + unit="b", + unit_scale=True, + unit_divisor=1024, + ) as bar: + for data in r.iter_content(chunk_size=1024): + size = fp.write(data) + bar.update(size) + else: + downloaded = 0 + with open(f"{path}/{name}", "wb") as fp: + for data in r.iter_content(chunk_size=1024): + size = fp.write(data) + downloaded += size + emit( + "download_current", + { + "status": "pending", + "total_bytes": totalBytes, + "download_bytes": downloaded, + }, + namespace="/", + broadcast=True, + ) + return True, None