Merge pull request 'Refactor functions to a shared module' (#1) from Decouple_from_Flask into main

Reviewed-on: #1
This commit is contained in:
Kentai Radiquum 2025-05-15 19:30:53 +05:00
commit c78314fb0a
13 changed files with 479 additions and 222 deletions

View file

@ -52,8 +52,10 @@ export default function PackPage() {
const res = await fetch(PACK_ENDPOINT("getPack", id));
if (!res.ok) router.push("/404");
const data: Pack = await res.json();
setPackData(data);
setPackDataLoading(false);
if (data._id) {
setPackData(data);
setPackDataLoading(false);
}
}
useEffect(() => {

View file

@ -3,3 +3,4 @@ flaskwebgui
requests
Pillow
flask-socketio
tqdm

View file

@ -1,63 +1,15 @@
import os
from . import apiDownload
from flask import request, jsonify, send_file, redirect, url_for, abort
from flask import request, jsonify
from config import PACKS_FOLDER
import json
from flask_socketio import emit
import requests
def download(path, url, name, total):
r = requests.get(url, stream=True)
if r.status_code != 200:
emit(
"download_current",
{
"status": "error",
"message": f"Got a HTTP ERROR {r.status_code} while downloading {name}",
},
)
return {
"status": "error",
"message": f"Got a HTTP ERROR {r.status_code} while downloading {name}",
}
downloaded = 0
if os.path.exists(f"{path}/{name}"):
emit(
"download_current",
{"status": "ok", "message": f"{name} already downloaded"},
namespace="/",
broadcast=True,
)
return {"status": "ok", "message": f"{name} already downloaded"}
with open(f"{path}/{name}", "wb") as fp:
for data in r.iter_content(chunk_size=1024):
size = fp.write(data)
downloaded += size
emit(
"download_current",
{
"status": "pending",
"total_bytes": total,
"download_bytes": downloaded,
},
namespace="/",
broadcast=True,
)
emit(
"download_current",
{"status": "ok", "message": f"{name} downloaded"},
namespace="/",
broadcast=True,
)
return {
"status": "ok",
"message": f"{name} downloaded",
}
from shared.download import download
@apiDownload.route("/pack", methods=["POST"])
def downloadPack():
def downloadPackEndpoint():
pack = {}
pack_id = request.json.get("pack_id")
@ -66,11 +18,33 @@ def downloadPack():
fp.close()
mods = pack.get("mods", [])
total = len(mods)
queue = []
for mod in mods:
queue.append(
{
"slug": mod.get("slug"),
"title": mod.get("file").get("title"),
"url": mod.get("file").get("url"),
"filename": mod.get("file").get("filename"),
"size": mod.get("file").get("size"),
}
)
for dep in mod.get("dependencies"):
queue.append(
{
"slug": dep.get("slug"),
"title": dep.get("file").get("title"),
"url": dep.get("file").get("url"),
"filename": dep.get("file").get("filename"),
"size": dep.get("file").get("size"),
}
)
queue = list({mod["slug"]: mod for mod in queue}.values())
total = len(queue)
os.makedirs(f"{PACKS_FOLDER}/{pack_id}/mods", exist_ok=True)
for i, mod in enumerate(mods):
for i, mod in enumerate(queue):
emit(
"download_total",
{
@ -78,17 +52,37 @@ def downloadPack():
"total": total,
"current": i,
"title": mod.get("title"),
"filename": mod.get("file").get("filename"),
"filename": mod.get("filename"),
},
namespace="/",
broadcast=True,
)
download(
status, message = download(
f"{PACKS_FOLDER}/{pack_id}/mods",
mod.get("file").get("url"),
mod.get("file").get("filename"),
mod.get("file").get("size"),
mod.get("url"),
mod.get("filename"),
mod.get("size"),
)
if status is False:
emit(
"download_current",
{
"status": "error",
"message": message,
},
namespace="/",
broadcast=True,
)
else:
emit(
"download_current",
{
"status": "ok",
"message": mod.get("filename"),
},
namespace="/",
broadcast=True,
)
emit(
"download_total",
@ -111,7 +105,7 @@ def downloadPack():
@apiDownload.route("/mods", methods=["POST"])
def downloadMods():
def downloadModsEndpoint():
pack = {}
pack_id = request.json.get("pack_id")
mods_slugs = request.json.get("mods")
@ -121,31 +115,73 @@ def downloadMods():
fp.close()
mods = pack.get("mods", [])
total = len(mods_slugs)
queue = []
for slug in mods_slugs:
for mod in mods:
if mod.get("slug") == slug:
queue.append(
{
"slug": mod.get("slug"),
"title": mod.get("file").get("title"),
"url": mod.get("file").get("url"),
"filename": mod.get("file").get("filename"),
"size": mod.get("file").get("size"),
}
)
for dep in mod.get("dependencies"):
queue.append(
{
"slug": dep.get("slug"),
"title": dep.get("file").get("title"),
"url": dep.get("file").get("url"),
"filename": dep.get("file").get("filename"),
"size": dep.get("file").get("size"),
}
)
queue = list({mod["slug"]: mod for mod in queue}.values())
total = len(queue)
os.makedirs(f"{PACKS_FOLDER}/{pack_id}/mods", exist_ok=True)
for i, slug in enumerate(mods_slugs):
for mod in mods:
if mod.get("slug") == slug:
emit(
"download_total",
{
"status": "ok",
"total": total,
"current": i,
"title": mod.get("title"),
"filename": mod.get("file").get("filename"),
},
namespace="/",
broadcast=True,
)
download(
f"{PACKS_FOLDER}/{pack_id}/mods",
mod.get("file").get("url"),
mod.get("file").get("filename"),
mod.get("file").get("size"),
)
for i, mod in enumerate(queue):
emit(
"download_total",
{
"status": "ok",
"total": total,
"current": i,
"title": mod.get("title"),
"filename": mod.get("filename"),
},
namespace="/",
broadcast=True,
)
status, message = download(
f"{PACKS_FOLDER}/{pack_id}/mods",
mod.get("url"),
mod.get("filename"),
mod.get("size"),
)
if status is False:
emit(
"download_current",
{
"status": "error",
"message": message,
},
namespace="/",
broadcast=True,
)
else:
emit(
"download_current",
{
"status": "ok",
"message": mod.get("filename"),
},
namespace="/",
broadcast=True,
)
emit(
"download_total",
@ -162,6 +198,6 @@ def downloadMods():
return jsonify(
{
"status": "ok",
"message": f"download of {pack_id} with {total} mods finished",
"message": f"download of {total} mods finished",
},
)

View file

@ -1,26 +1,19 @@
import os
import re
from . import apiPack
from flask import request, jsonify, send_file, redirect, url_for, abort
from flask import request, jsonify, send_file, redirect, url_for
from config import PACKS_FOLDER, IMG_ALLOWED_MIME
from PIL import Image
from io import BytesIO
import base64
import json
from .source import Modrinth, CurseForge
from shared.pack import getPack, addMod, deleteMod
@apiPack.route("/<id>", methods=["GET"])
def getPack(id):
if not os.path.exists(f"{PACKS_FOLDER}/{id}/packfile.json"):
def getPackEndpoint(id):
pack = getPack(id)
if pack is None:
return jsonify({"status": "error", "message": "pack not found"}), 404
pack = {}
with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp:
pack = json.load(fp)
pack["_id"] = id
fp.close()
return jsonify(pack)
@ -63,73 +56,35 @@ def editPackImage(id):
@apiPack.route("/<id>/mod/add", methods=["POST"])
def addMod(id):
def addModEndpoint(id):
source = request.json.get("source", None)
slug = request.json.get("slug", None)
version = request.json.get("version", None)
pack = {}
mod = {}
mod = addMod(id, source, slug, version)
if not os.path.exists(f"{PACKS_FOLDER}/{id}/packfile.json"):
if mod == "err_404":
return jsonify({"status": "error", "message": "pack not found"})
with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp:
pack = json.load(fp)
fp.close()
mod_loader = pack.get("modloader").lower()
game_version = pack.get("version")
if not source:
elif mod == "err_source":
return jsonify({"status": "error", "message": "mod source is required"})
if not slug:
elif mod == "err_slug":
return jsonify({"status": "error", "message": "mod slug is required"})
for mod in pack["mods"]:
if mod.get("slug") == slug:
return jsonify({"status": "error", "message": "mod already exists"})
if source == "Modrinth":
mod = Modrinth.getModrinthMod(slug, version, mod_loader, game_version)
elif source == "CurseForge":
mod = CurseForge.getCurseForgeMod(slug, version, mod_loader, game_version)
if mod.get("status") != "ok":
return jsonify({"status": "error", "message": mod.get("message")})
pack["modpackVersion"] += 1
pack["mods"].append(mod.get("mod"))
with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp:
json.dump(pack, fp)
fp.close()
elif mod == "err_exists":
return jsonify({"status": "error", "message": "mod already exists"})
elif isinstance(mod, str):
return jsonify({"status": "error", "message": mod})
return jsonify(
{
"status": "ok",
"message": f"mod {mod.get("mod").get('title')} ({slug}) has been added",
"mod": mod.get("mod"),
"message": f"mod {mod.get("title")} ({slug}) has been added",
"mod": mod,
}
)
@apiPack.route("/<id>/mod/<slug>/delete", methods=["GET"])
def deleteMod(id, slug):
pack = {}
with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp:
pack = json.load(fp)
fp.close()
for mod in pack.get("mods"):
if mod.get("slug") == slug:
pack["mods"].remove(mod)
pack["modpackVersion"] += 1
if os.path.exists(
f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}"
):
os.remove(f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}")
with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp:
json.dump(pack, fp)
fp.close()
def deleteModEndpoint(id, slug):
deleteMod(id, slug)
return jsonify(
{
"status": "ok",
@ -138,34 +93,34 @@ def deleteMod(id, slug):
)
@apiPack.route("/<id>/mods/delete", methods=["POST"])
def deleteModBulk(id):
pack = {}
slugs = request.json
# @apiPack.route("/<id>/mods/delete", methods=["POST"])
# def deleteModBulk(id):
# pack = {}
# slugs = request.json
with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp:
pack = json.load(fp)
fp.close()
# with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp:
# pack = json.load(fp)
# fp.close()
for slug in slugs:
for mod in pack.get("mods"):
if mod.get("slug") == slug:
pack["mods"].remove(mod)
pack["modpackVersion"] += 1
if os.path.exists(
f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}"
):
os.remove(
f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}"
)
# for slug in slugs:
# for mod in pack.get("mods"):
# if mod.get("slug") == slug:
# pack["mods"].remove(mod)
# pack["modpackVersion"] += 1
# if os.path.exists(
# f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}"
# ):
# os.remove(
# f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}"
# )
with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp:
json.dump(pack, fp)
fp.close()
# with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp:
# json.dump(pack, fp)
# fp.close()
return jsonify(
{
"status": "ok",
"message": f"mods has been removed",
}
)
# return jsonify(
# {
# "status": "ok",
# "message": f"mods has been removed",
# }
# )

View file

@ -1,70 +1,37 @@
import os
from . import apiPacks
from flask import request, jsonify
from config import PACKS_FOLDER
import json
import shutil
from shared.packs import getPacks, createPack, deletePack
@apiPacks.route("/all", methods=["GET"])
def getPacks():
packs = []
if not os.path.exists(f"{PACKS_FOLDER}"):
os.makedirs(f"{PACKS_FOLDER}", exist_ok=True)
return jsonify(packs)
pack_folders = [f.name for f in os.scandir(PACKS_FOLDER) if f.is_dir()]
for pack_folder in pack_folders:
if not os.path.exists(f"{PACKS_FOLDER}/{pack_folder}/packfile.json"):
continue
with open(f"{PACKS_FOLDER}/{pack_folder}/packfile.json") as fp:
pack = json.load(fp)
pack["_id"] = pack_folder
packs.append(pack)
fp.close()
return jsonify(packs)
def getPacksEndpoint():
return jsonify(getPacks())
@apiPacks.route("/new", methods=["POST"])
def createPack():
pack = {
"formatVersion": 0,
"modpackVersion": 0,
"title": request.json.get("title"),
"author": request.json.get("author"),
"version": request.json.get("version"),
"modloader": request.json.get("modloader"),
"updateURL": "",
"mods": [],
}
title = pack.get("title").replace(" ", "_")
def createPackEndpoint():
pack, is_exists = createPack(
request.json.get("title"),
request.json.get("author"),
request.json.get("version"),
request.json.get("modloader"),
)
if os.path.exists(f"{PACKS_FOLDER}/{title}"):
if is_exists:
return jsonify({"status": "error", "message": "pack already exists"})
os.makedirs(f"{PACKS_FOLDER}/{title}", exist_ok=True)
with open(
os.path.abspath(f"{PACKS_FOLDER}/{title}/packfile.json"),
mode="w",
encoding="utf-8",
) as fp:
json.dump(pack, fp)
fp.close()
return jsonify(
{
"status": "ok",
"message": f"pack {pack.get('title')} created",
"id": title,
"message": f"pack {pack.title} created",
"id": pack._id,
}
)
@apiPacks.route("/<id>/delete", methods=["GET"])
def deletePack(id):
shutil.rmtree(f"{PACKS_FOLDER}/{id}")
def deletePackEndpoint(id):
deletePack(id)
return jsonify(
{
"status": "ok",

0
src/shared/__init__.py Normal file
View file

46
src/shared/download.py Normal file
View file

@ -0,0 +1,46 @@
import os
from flask_socketio import emit
import requests
from typing import Literal
from tqdm import tqdm
def download(
path, url, name, total
) -> tuple[Literal[True], None] | tuple[Literal[False], str]:
if os.path.exists(f"{path}/{name}"):
return True, None
r = requests.get(url, stream=True)
if r.status_code != 200:
return False, f"Got a HTTP ERROR {r.status_code} while downloading {name}"
totalBytes = int(r.headers.get("Content-Length", total))
if os.getenv("is_cli"):
with open(f"{path}/{name}", "wb") as fp, tqdm(
desc=name.ljust(40),
total=totalBytes,
miniters=100,
unit="b",
unit_scale=True,
unit_divisor=1024,
) as bar:
for data in r.iter_content(chunk_size=1024):
size = fp.write(data)
bar.update(size)
else:
downloaded = 0
with open(f"{path}/{name}", "wb") as fp:
for data in r.iter_content(chunk_size=1024):
size = fp.write(data)
downloaded += size
emit(
"download_current",
{
"status": "pending",
"total_bytes": totalBytes,
"download_bytes": downloaded,
},
namespace="/",
broadcast=True,
)
return True, None

91
src/shared/pack.py Normal file
View file

@ -0,0 +1,91 @@
import os
from config import PACKS_FOLDER
import json
from type.pack import Pack
from type.mod import Mod
from typing import Literal
from api.source import Modrinth, CurseForge
def getPack(id: str) -> Pack | None:
if not os.path.exists(f"{PACKS_FOLDER}/{id}/packfile.json"):
return None
pack: Pack = {}
with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp:
pack = json.load(fp)
pack["_id"] = id
fp.close()
return pack
def addMod(
pack_id: str, source: str, slug: str, version: str | None
) -> (
Mod
| Literal["err_404"]
| Literal["err_source"]
| Literal["err_slug"]
| Literal["err_exists"]
| str
):
pack: Pack = {}
mod: Mod = {}
if not os.path.exists(f"{PACKS_FOLDER}/{pack_id}/packfile.json"):
return "err_404"
with open(f"{PACKS_FOLDER}/{pack_id}/packfile.json") as fp:
pack = json.load(fp)
fp.close()
mod_loader = pack.get("modloader").lower()
game_version = pack.get("version")
if not source:
return "err_source"
if not slug:
return "err_slug"
for mod in pack["mods"]:
if mod.get("slug") == slug:
return "err_exists"
if source == "Modrinth":
mod = Modrinth.getModrinthMod(slug, version, mod_loader, game_version)
elif source == "CurseForge":
mod = CurseForge.getCurseForgeMod(slug, version, mod_loader, game_version)
if mod.get("status") != "ok":
return mod.get("message")
pack["modpackVersion"] += 1
pack["mods"].append(mod.get("mod"))
with open(
f"{PACKS_FOLDER}/{pack_id}/packfile.json", mode="w", encoding="utf-8"
) as fp:
json.dump(pack, fp)
fp.close()
return mod.get("mod")
def deleteMod(id: str, slug: str) -> Literal[True]:
pack = {}
with open(f"{PACKS_FOLDER}/{id}/packfile.json") as fp:
pack = json.load(fp)
fp.close()
for mod in pack.get("mods"):
if mod.get("slug") == slug:
pack["mods"].remove(mod)
pack["modpackVersion"] += 1
if os.path.exists(
f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}"
):
os.remove(f"{PACKS_FOLDER}/{id}/mods/{mod.get('file').get('filename')}")
with open(f"{PACKS_FOLDER}/{id}/packfile.json", mode="w", encoding="utf-8") as fp:
json.dump(pack, fp)
fp.close()
return True

61
src/shared/packs.py Normal file
View file

@ -0,0 +1,61 @@
import os
from config import PACKS_FOLDER
import json
import shutil
from type.pack import Pack
def getPacks() -> list[Pack]:
"""
Lists and returns all available packs from PACKS_FOLDER directory defined in config.py
"""
packs: list[Pack] = []
if not os.path.exists(f"{PACKS_FOLDER}"):
os.makedirs(f"{PACKS_FOLDER}", exist_ok=True)
return packs
pack_folders = [f.name for f in os.scandir(PACKS_FOLDER) if f.is_dir()]
for pack_folder in pack_folders:
if not os.path.exists(f"{PACKS_FOLDER}/{pack_folder}/packfile.json"):
continue
with open(f"{PACKS_FOLDER}/{pack_folder}/packfile.json") as fp:
pack = json.load(fp)
pack["_id"] = pack_folder
packs.append(pack)
fp.close()
return packs
def createPack(
title: str, author: str, game_version: str, mod_loader: str
) -> tuple[Pack, bool]:
"""
Creates a new pack.
If pack exists returns tuple[Pack, True], if pack was created returns tuple[Pack, False]
"""
pack = Pack(
title.replace(" ", "_"), title, author, game_version, mod_loader, "", [], 0, 0
)
if os.path.exists(f"{PACKS_FOLDER}/{pack._id}"):
return pack, True
os.makedirs(f"{PACKS_FOLDER}/{pack._id}", exist_ok=True)
with open(
os.path.abspath(f"{PACKS_FOLDER}/{pack._id}/packfile.json"),
mode="w",
encoding="utf-8",
) as fp:
json.dump(pack.json(), fp)
fp.close()
return pack, False
def deletePack(id):
shutil.rmtree(f"{PACKS_FOLDER}/{id}")
return True

0
src/type/__init__.py Normal file
View file

21
src/type/file.py Normal file
View file

@ -0,0 +1,21 @@
import json
class ModFile:
def __init__(
self, version: str, hashes: dict[str, str], url: str, filename: str, size: int
):
self.version = version
self.hashes = hashes
self.url = url
self.filename = filename
self.size = size
def json(self):
return {
"version": self.version,
"hashes": self.hashes,
"url": self.url,
"filename": self.filename,
"size": self.size,
}

39
src/type/mod.py Normal file
View file

@ -0,0 +1,39 @@
import json
from .file import ModFile
class Mod:
def __init__(
self,
slug: str,
project_id: str,
icon: str,
title: str,
developers: list[str],
source: str,
environment: dict[str, bool],
dependencies: list,
file: ModFile,
):
self.slug = slug
self.project_id = project_id
self.icon = icon
self.title = title
self.developers = developers
self.source = source
self.environment = environment
self.dependencies = dependencies
self.file = file
def json(self):
return {
"slug": self.slug,
"project_id": self.project_id,
"icon": self.icon,
"title": self.title,
"developers": self.developers,
"source": self.source,
"environment": self.environment,
"dependencies": self.dependencies,
"file": self.file,
}

38
src/type/pack.py Normal file
View file

@ -0,0 +1,38 @@
import json
from .mod import Mod
class Pack:
def __init__(
self,
_id: str,
title: str,
author: str,
version: str,
modloader: str,
updateURL: str,
mods: list[Mod],
modpackVersion: int = 0,
formatVersion: int = 0,
):
self._id = _id
self.title = title
self.author = author
self.version = version
self.modloader = modloader
self.updateURL = updateURL
self.mods = mods
self.modpackVersion = modpackVersion
self.formatVersion = formatVersion
def json(self):
return {
"title": self.title,
"author": self.author,
"version": self.version,
"modloader": self.modloader,
"updateURL": self.updateURL,
"mods": self.mods,
"modpackVersion": self.modpackVersion,
"formatVersion": self.formatVersion,
}