diff --git a/wau/addons.py b/wau/addons.py index 076dd30..9fcf217 100644 --- a/wau/addons.py +++ b/wau/addons.py @@ -65,11 +65,15 @@ class Addon: raise ValueError(f"No AddOn provider for {self}") def download(self) -> bool: - logger.info("Downloading %s", self.url) - self.create_download_dir() - provider = self.get_provider() - changed = provider.download(self) - return changed + try: + logger.info("Downloading %s", self.url) + self.create_download_dir() + provider = self.get_provider() + changed = provider.download(self) + return changed + except Exception as e: + logger.exception(e) + return False def install(self) -> None: logger.info("Installing %s", self) diff --git a/wau/http.py b/wau/http.py index cb5c2d5..c8bb0cb 100644 --- a/wau/http.py +++ b/wau/http.py @@ -9,6 +9,7 @@ from functools import lru_cache from http.client import HTTPResponse from pathlib import Path from typing import Mapping +from typing import Optional from urllib.request import Request @@ -43,17 +44,20 @@ class Response: return json.loads(self.bytes) -def open(url: str, params: Mapping = None) -> Response: +def open(url: str, params: Mapping = None, headers: Optional[Mapping] = None) -> Response: while True: try: if params is not None: url += "?" + urllib.parse.urlencode(params) request = Request(url) request.add_header("User-Agent", HTTP_USER_AGENT) + if headers is not None: + for header_key, header_value in headers.items(): + request.add_header(header_key, header_value) http_response = urllib.request.urlopen(request, timeout=HTTP_TIMEOUT) return Response(http_response) except TimeoutError as e: - logger.exception("Timeout", exc_info=e) + print(e) def download_zip(url: str, dest: Path) -> None: diff --git a/wau/providers/curseforge.py b/wau/providers/curseforge.py index b5a7728..6bc92c8 100644 --- a/wau/providers/curseforge.py +++ b/wau/providers/curseforge.py @@ -1,4 +1,7 @@ import logging +import os +import urllib.error +from typing import Dict from .web import Web from .. import http @@ -8,7 +11,16 @@ logger = logging.getLogger(__name__) class CurseForge(Web): - api_url = "https://addons-ecs.forgesvc.net" + api_url = "https://api.curseforge.com" + + @classmethod + def headers(cls) -> Dict[str, str]: + api_key = os.getenv("CURSE_API_KEY") + if not api_key: + logger.warning("No CURSE_API_KEY!") + return { + "x-api-key": api_key, + } @classmethod def is_supported(cls, url: str) -> bool: @@ -43,14 +55,15 @@ class CurseForge(Web): slug.replace("-", " ") )) addons = http.open( - url=f"{cls.api_url}/api/v2/addon/search", + url=f"{cls.api_url}/v1/mods/search", + headers=cls.headers(), params={ "gameId": 1, # World of Warcraft "pageSize": 50, "searchFilter": query } ).json() - for a in addons: + for a in addons["data"]: if a["slug"] == slug: addon.provider_data["curse_id"] = a["id"] addon.name = a["name"] @@ -58,27 +71,41 @@ class CurseForge(Web): raise ValueError("AddOn slug not found in CurseForge search results.") @classmethod - def _get_curse_addon_info(cls, addon: Addon) -> dict: + def _get_latest_file_url(cls, addon: Addon) -> str: curse_id = cls._get_curse_id(addon) logger.debug("Getting latest AddOn info from CurseForge") - return http.open(f"{cls.api_url}/api/v2/addon/{curse_id}").json() + files = http.open(f"{cls.api_url}/v1/mods/{curse_id}/files", headers=cls.headers()).json()["data"] - @classmethod - def _get_latest_file_url(cls, addon: Addon) -> str: - info = cls._get_curse_addon_info(addon) - flavor_priorities = [ - "wow_burning_crusade", - "wow_classic", - "wow_retail", - ] - files = sorted( - info["latestFiles"], - key=lambda f: ( - flavor_priorities.index(f["gameVersionFlavor"]), - f["releaseType"] # releaseType: 1: release, 2: beta, 3: alpha - ), - ) + def key(index: int, file: dict) -> tuple: + version_priorities = [73246, 67408, 517] # https://api.curseforge.com/v1/games/1/version-types + return ( + min(version_priorities.index(v["gameVersionTypeId"]) for v in file["sortableGameVersions"]), + file["releaseType"], # releaseType: 1: release, 2: beta, 3: alpha + index, + ) + + files = sorted(enumerate(files), key=lambda x: key(*x)) try: - return files[0]["downloadUrl"] + _, file = files[0] except IndexError: raise FileNotFoundError("No file found") + return cls._get_file_url(file) + + @classmethod + def _get_file_url(cls, file: dict) -> str: + url = file["downloadUrl"] + if url is not None: + return url + + # addon authors can choose to "hide" the download url from the new api, but it can be bruteforced + id = str(file["id"]) + file_name = file["fileName"] + for index in range(len(id)): + x, y = id[:index], id[index:] + url = f"https://edge.forgecdn.net/files/{x}/{y}/{file_name}" + try: + http.open(url).head() + return url + except (urllib.error.HTTPError, urllib.error.URLError) as e: + pass + raise FileNotFoundError("No file URL found")