From 2dcef470ad9cb5153e273da411ebb3e9de6dacb9 Mon Sep 17 00:00:00 2001 From: "Casper V. Kristensen" Date: Sun, 12 Jun 2022 18:45:25 +0200 Subject: [PATCH] Black --- dailyreleases/__main__.py | 3 +- dailyreleases/cache.py | 67 ++++++++++++++++-------- dailyreleases/config.py | 17 ++---- dailyreleases/generation.py | 59 ++++++++++++++------- dailyreleases/main.py | 15 ++++-- dailyreleases/parsing.py | 68 ++++++++++++++++-------- dailyreleases/predbs.py | 36 +++++++------ dailyreleases/reddit.py | 5 +- dailyreleases/stores/__init__.py | 2 +- dailyreleases/stores/gog.py | 17 +++--- dailyreleases/stores/steam.py | 71 ++++++++++++++----------- dailyreleases/stores/web.py | 13 +++-- dailyreleases/util.py | 39 ++++++++------ setup.py | 24 +++------ tests/test_parse_dirname.py | 89 +++++++++++++++++++++++++------- 15 files changed, 329 insertions(+), 196 deletions(-) diff --git a/dailyreleases/__main__.py b/dailyreleases/__main__.py index 32add98..15b3910 100644 --- a/dailyreleases/__main__.py +++ b/dailyreleases/__main__.py @@ -2,6 +2,7 @@ The main terminal-based entry point. Invoke as `dailyreleases' or `python3 -m dailyreleases'. """ -if __name__ == '__main__': +if __name__ == "__main__": from .main import main + main() diff --git a/dailyreleases/cache.py b/dailyreleases/cache.py index b39595e..fd61788 100644 --- a/dailyreleases/cache.py +++ b/dailyreleases/cache.py @@ -14,7 +14,9 @@ from .config import DATA_DIR, CONFIG logger = logging.getLogger(__name__) connection = sqlite3.connect(DATA_DIR.joinpath("cache.sqlite")) -connection.row_factory = sqlite3.Row # allow accessing rows by index and case-insensitively by name +connection.row_factory = ( + sqlite3.Row +) # allow accessing rows by index and case-insensitively by name connection.text_factory = bytes # do not try to decode bytes as utf-8 strings DEFAULT_CACHE_TIME = timedelta(seconds=CONFIG["web"].getint("cache_time")) @@ -32,22 +34,27 @@ class Response: def setup(): - connection.execute(""" + connection.execute( + """ CREATE TABLE IF NOT EXISTS requests (id INTEGER PRIMARY KEY, url TEXT UNIQUE NOT NULL, response BLOB NOT NULL, timestamp INTEGER NOT NULL); - """) + """ + ) def clean(older_than=timedelta(days=3)): - connection.execute(""" + connection.execute( + """ DELETE FROM requests WHERE timestamp < :cutoff; - """, { + """, + { "cutoff": (datetime.utcnow() - older_than).timestamp(), - }) + }, + ) connection.commit() connection.executescript("VACUUM;") @@ -55,8 +62,14 @@ def clean(older_than=timedelta(days=3)): last_request = defaultdict(float) -def get(url: str, params: Mapping = None, cache_time: timedelta = DEFAULT_CACHE_TIME, - ratelimit: Optional[float] = 1, *args, **kwargs) -> Response: +def get( + url: str, + params: Mapping = None, + cache_time: timedelta = DEFAULT_CACHE_TIME, + ratelimit: Optional[float] = 1, + *args, + **kwargs +) -> Response: """ Sends a GET request, caching the result for cache_time. If 'ratelimit' is supplied, requests are rate limited at the host-level to this number of requests per second. @@ -64,41 +77,51 @@ def get(url: str, params: Mapping = None, cache_time: timedelta = DEFAULT_CACHE_ if params is not None: url += "?" + urllib.parse.urlencode(params) request = Request(url, *args, **kwargs) - request.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0") + request.add_header( + "User-Agent", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0", + ) - #logger.debug("Get %s", url) + # logger.debug("Get %s", url) - row = connection.execute(""" + row = connection.execute( + """ SELECT response, timestamp FROM requests WHERE url = :url; - """, { - "url": url - }).fetchone() + """, + {"url": url}, + ).fetchone() - if row is not None and datetime.fromtimestamp(row["timestamp"]) > datetime.utcnow() - cache_time: - #logger.debug("Cache hit: %s", url) + if ( + row is not None + and datetime.fromtimestamp(row["timestamp"]) > datetime.utcnow() - cache_time + ): + # logger.debug("Cache hit: %s", url) return Response(row["response"]) - #logger.debug("Cache miss: %s", url) + # logger.debug("Cache miss: %s", url) if ratelimit is not None: min_interval = 1 / ratelimit elapsed = time.time() - last_request[request.host] wait = min_interval - elapsed if wait > 0: - #logger.debug("Rate-limited for %ss", round(wait, 2)) + # logger.debug("Rate-limited for %ss", round(wait, 2)) time.sleep(wait) response = Response(urlopen(request).read()) last_request[request.host] = time.time() - connection.execute(""" + connection.execute( + """ INSERT OR REPLACE INTO requests(url, response, timestamp) VALUES (:url, :response, :timestamp); - """, { + """, + { "url": url, "response": response.bytes, - "timestamp": datetime.utcnow().timestamp() - }) + "timestamp": datetime.utcnow().timestamp(), + }, + ) connection.commit() return response diff --git a/dailyreleases/config.py b/dailyreleases/config.py index 45d97f3..db6129a 100644 --- a/dailyreleases/config.py +++ b/dailyreleases/config.py @@ -46,7 +46,7 @@ def logging_config(file, level, backup_count) -> dict: "class": "logging.StreamHandler", "stream": "ext://sys.stdout", "formatter": "standard", - "level": level + "level": level, }, "file": { "class": "logging.handlers.TimedRotatingFileHandler", @@ -55,18 +55,11 @@ def logging_config(file, level, backup_count) -> dict: "filename": file, "encoding": "utf-8", "formatter": "standard", - "level": level - } + "level": level, + }, }, - "loggers": { - "dailyreleases": { - "level": level - } - }, - "root": { - "handlers": ["console", "file"], - "level": "WARNING" - } + "loggers": {"dailyreleases": {"level": level}}, + "root": {"handlers": ["console", "file"], "level": "WARNING"}, } diff --git a/dailyreleases/generation.py b/dailyreleases/generation.py index 9da8d56..d4cd388 100644 --- a/dailyreleases/generation.py +++ b/dailyreleases/generation.py @@ -24,7 +24,9 @@ def popularity(release: Release): def row(release: Release): # Bold row if Denuvo crack. We're checking this first so as to not actually insert 'DENUVO' as a highlight - highlights = [h for h in release.highlights if h not in ("DENUVO",)] # avoids modifying original release object + highlights = [ + h for h in release.highlights if h not in ("DENUVO",) + ] # avoids modifying original release object bold = highlights != release.highlights # The rows in the table containing updates will use the full rls_name as the name, while tables @@ -34,22 +36,27 @@ def row(release: Release): else: tags = " ({})".format(" ".join(release.tags)) if release.tags else "" highlights = " **- {}**".format(", ".join(highlights)) if highlights else "" - name = "[{}{}]({}){}".format(util.markdown_escape(release.game_name), - tags, - release.nfo_link, - highlights) + name = "[{}{}]({}){}".format( + util.markdown_escape(release.game_name), tags, release.nfo_link, highlights + ) - stores = ", ".join(f"[{name}]({link})" for name, link in release.store_links.items()) + stores = ", ".join( + f"[{name}]({link})" for name, link in release.store_links.items() + ) if release.score == -1: reviews = "-" else: - num_reviews_humanized = util.humanize(release.num_reviews, precision=1, prefix="dec", suffix="") + num_reviews_humanized = util.humanize( + release.num_reviews, precision=1, prefix="dec", suffix="" + ) reviews = f"{release.score:.0%} ({num_reviews_humanized})" r = (name, release.group, stores, reviews) if bold: - r = tuple(f"**{c.replace('**', '')}**" for c in r) # .replace ensures no nested bold, which is unsupported + r = tuple( + f"**{c.replace('**', '')}**" for c in r + ) # .replace ensures no nested bold, which is unsupported return r @@ -70,18 +77,24 @@ def generate_post(releases: Releases) -> str: # popular game within the group. Games are sorted by popularity internally in the groups as well. group_order = defaultdict(lambda: (0, -1, False)) for release in type_releases: - group_order[release.group] = max(group_order[release.group], popularity(release)) + group_order[release.group] = max( + group_order[release.group], popularity(release) + ) def order(release: Release): - return (group_order[release.group], - release.group, # ensure grouping if two groups share group_order - popularity(release)) + return ( + group_order[release.group], + release.group, # ensure grouping if two groups share group_order + popularity(release), + ) type_releases.sort(key=order, reverse=True) post.append(f"| {type} | Group | Store | Score (Reviews) |") post.append("|:-|:-|:-|:-|") - post.extend("| {} | {} | {} | {} |".format(*row(rls)) for rls in type_releases) + post.extend( + "| {} | {} | {} | {} |".format(*row(rls)) for rls in type_releases + ) post.append("") post.append(" ") @@ -96,7 +109,9 @@ def generate_post(releases: Releases) -> str: # Add link to the previous release thread previous_post = reddit.get_previous_daily_post(CONFIG["reddit"]["posts_subreddit"]) - previous_post_date = re.search("daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE).group(1) + previous_post_date = re.search( + "daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE + ).group(1) post.append(f"# [<< {previous_post_date}]({previous_post.url})") # Add epilogue @@ -115,14 +130,14 @@ def generate_post(releases: Releases) -> str: @util.retry(attempts=3, delay=120) def generate(post=False, pm_recipients=None) -> None: - logger.info("-------------------------------------------------------------------------------------------------") + logger.info( + "-------------------------------------------------------------------------------------------------" + ) start_time = time.time() processed = load_processed() pres = predbs.get_pres() - releases = parsing.parse_pres(pre - for pre in pres - if pre.dirname not in processed) + releases = parsing.parse_pres(pre for pre in pres if pre.dirname not in processed) # The date of the post changes at midday instead of midnight to allow calling script after 00:00 title = f"Daily Releases ({(datetime.utcnow() - timedelta(hours=12)).strftime('%B %-d, %Y')})" @@ -133,7 +148,9 @@ def generate(post=False, pm_recipients=None) -> None: if post: # Post to bot's own subreddit bot_subreddit = CONFIG["reddit"]["bot_subreddit"] - reddit_src_post = reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit) + reddit_src_post = reddit.submit_post( + f"{title} - Source", generated_post_src, bot_subreddit + ) reddit_post = reddit.submit_post(title, generated_post, bot_subreddit) # Manually approve posts since reddit seem to think posts with many links are spam @@ -155,7 +172,9 @@ def generate(post=False, pm_recipients=None) -> None: cache.clean() logger.info("Execution took %s seconds", int(time.time() - start_time)) - logger.info("-------------------------------------------------------------------------------------------------") + logger.info( + "-------------------------------------------------------------------------------------------------" + ) def load_processed() -> Set[str]: diff --git a/dailyreleases/main.py b/dailyreleases/main.py index c083f7c..850efd6 100644 --- a/dailyreleases/main.py +++ b/dailyreleases/main.py @@ -21,7 +21,9 @@ def listen_inbox() -> None: if message.author in authorized_users: generate(post=True, pm_recipients=(message.author.name,)) else: - logger.info("Discarding PM from %s: not authorized user", message.author) + logger.info( + "Discarding PM from %s: not authorized user", message.author + ) message.mark_read() # mark message read last so we can retry after potential fatal errors except prawcore.PrawcoreException as e: logger.warning("PrawcoreException: %s", e) @@ -30,6 +32,7 @@ def listen_inbox() -> None: print("Exiting (KeyboardInterrupt)") break + def at_midnight() -> None: while True: try: @@ -38,7 +41,9 @@ def at_midnight() -> None: until_midnight = midnight - now logger.info(f"Waiting {until_midnight} until midnight..") sleep(until_midnight.total_seconds()) - generate(post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(",")) + generate( + post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(",") + ) except Exception as e: logger.exception(e) except KeyboardInterrupt: @@ -55,7 +60,9 @@ def main() -> None: if mode == "test": generate(post=False) if mode == "immediately": - generate(post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(",")) + generate( + post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(",") + ) if mode == "midnight": at_midnight() if mode == "reply": @@ -65,5 +72,5 @@ def main() -> None: raise e -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/dailyreleases/parsing.py b/dailyreleases/parsing.py index 2e5e4d9..a4e2143 100644 --- a/dailyreleases/parsing.py +++ b/dailyreleases/parsing.py @@ -43,7 +43,9 @@ class Release: store_links: Dict[str, str] = field(default_factory=dict) tags: List[str] = field(default_factory=list) highlights: List[str] = field(default_factory=list) - score: int = -1 # score and number of reviews is -1 by default; it is updated if the game exists on Steam + score: int = ( + -1 + ) # score and number of reviews is -1 by default; it is updated if the game exists on Steam num_reviews: int = -1 @@ -111,12 +113,10 @@ BLACKLISTED = ( "Fedora", "openSUSE", "jQuery", - "CSS" - "ASP[._-]NET", + "CSS" "ASP[._-]NET", "Windows[._-]Server", "Lynda", - "OREILLY" - "Wintellectnow", + "OREILLY" "Wintellectnow", "3ds[._-]?Max", "For[._-]Maya", "Cinema4D", @@ -137,8 +137,11 @@ def parse_pre(pre: Pre, offline=False) -> Release: rls_name, group = pre.dirname.rsplit("-", maxsplit=1) # Find game name by matching until one of the stopwords - game_name, *stopwords = re.split("[._-]({})".format("|".join(STOPWORDS + TAGS + HIGHLIGHTS)), - rls_name, flags=re.IGNORECASE) + game_name, *stopwords = re.split( + "[._-]({})".format("|".join(STOPWORDS + TAGS + HIGHLIGHTS)), + rls_name, + flags=re.IGNORECASE, + ) # Prettify game name by substituting word delimiters with spaces game_name = re.sub("[_-]", " ", game_name) @@ -147,14 +150,18 @@ def parse_pre(pre: Pre, offline=False) -> Release: game_name = re.sub("(\w{2,})[.]", "\g<1> ", game_name) # Some stopwords distinguishes two otherwise identical releases (e.g. x86/x64) - we call these tags - tags = [stopword - for stopword in stopwords - if re.match("|".join(TAGS), stopword, flags=re.IGNORECASE)] + tags = [ + stopword + for stopword in stopwords + if re.match("|".join(TAGS), stopword, flags=re.IGNORECASE) + ] # Some stopwords signify an important piece of information and deserve to be highlighted (e.g. PROPER) - highlights = [stopword - for stopword in stopwords - if re.match("|".join(HIGHLIGHTS), stopword, flags=re.IGNORECASE)] + highlights = [ + stopword + for stopword in stopwords + if re.match("|".join(HIGHLIGHTS), stopword, flags=re.IGNORECASE) + ] # Find platform if re.search("mac[._-]?os[._-]?x?", rls_name, flags=re.IGNORECASE): @@ -166,9 +173,15 @@ def parse_pre(pre: Pre, offline=False) -> Release: # Find release type (Game/DLC/Update) # Order of the if-statements is important: Update trumps DLC because an update to a DLC is an update, not a DLC! - if re.search("update|v[0-9]|addon|Crack[._-]?fix|DIR[._-]?FIX|build[._-]?[0-9]+", rls_name, flags=re.IGNORECASE): + if re.search( + "update|v[0-9]|addon|Crack[._-]?fix|DIR[._-]?FIX|build[._-]?[0-9]+", + rls_name, + flags=re.IGNORECASE, + ): rls_type = ReleaseType.UPDATE - elif re.search("(? Release: type=rls_type, platform=platform, tags=tags, - highlights=highlights + highlights=highlights, ) if offline: @@ -204,19 +217,32 @@ def parse_pre(pre: Pre, offline=False) -> Release: try: steam.update_info(release) except Exception as e: # a lot of stuff can go wrong with Steam's API, better catch everything - logger.error("Failed to update release info using Steam's API on %s", release) + logger.error( + "Failed to update release info using Steam's API on %s", release + ) logger.exception(e) - logger.info("Final : %s %s : %s - %s : %s", release.platform, release.type, release.game_name, release.group, - release) + logger.info( + "Final : %s %s : %s - %s : %s", + release.platform, + release.type, + release.game_name, + release.group, + release, + ) return release -Releases = Dict[Platform, Dict[ReleaseType, List[Release]]] # {Windows: {Game: [..], DLC: [..], ..}, Linux: ...} +Releases = Dict[ + Platform, Dict[ReleaseType, List[Release]] +] # {Windows: {Game: [..], DLC: [..], ..}, Linux: ...} def parse_pres(pres: Iterable[Pre]) -> Releases: - releases = {platform: {release_type: [] for release_type in ReleaseType} for platform in Platform} + releases = { + platform: {release_type: [] for release_type in ReleaseType} + for platform in Platform + } for pre in pres: try: release = parse_pre(pre) diff --git a/dailyreleases/predbs.py b/dailyreleases/predbs.py index 701b16e..98ef424 100644 --- a/dailyreleases/predbs.py +++ b/dailyreleases/predbs.py @@ -22,7 +22,9 @@ def get_pres() -> List[Pre]: pres = {} for get in (get_predbme, get_xrel): # in reverse order of preference try: - pres.update((p.dirname, p) for p in get()) # override duplicate dirnames in later iterations + pres.update( + (p.dirname, p) for p in get() + ) # override duplicate dirnames in later iterations except HTTPError as e: logger.error(e) logger.warning("Connection to predb failed, skipping..") @@ -34,20 +36,23 @@ def get_xrel(categories=("CRACKED", "UPDATE"), num_pages=2) -> List[Pre]: logger.debug("Getting pres from xrel.to") def get_releases_in_category(category, page): - r = cache.get("https://api.xrel.to/v2/release/browse_category.json", params={ - "category_name": category, - "ext_info_type": "game", - "per_page": 100, - "page": page - }) + r = cache.get( + "https://api.xrel.to/v2/release/browse_category.json", + params={ + "category_name": category, + "ext_info_type": "game", + "per_page": 100, + "page": page, + }, + ) return r.json["list"] - return [Pre(rls["dirname"], - rls["link_href"], - datetime.fromtimestamp(rls["time"])) - for category in categories - for page in range(1, num_pages) - for rls in get_releases_in_category(category, page)] + return [ + Pre(rls["dirname"], rls["link_href"], datetime.fromtimestamp(rls["time"])) + for category in categories + for page in range(1, num_pages) + for rls in get_releases_in_category(category, page) + ] def get_predbme() -> List[Pre]: @@ -59,7 +64,4 @@ def get_predbme() -> List[Pre]: # Predb.me doesn't show timestamps in the RSS-feed, but the feed is so short it only shows ~72 hours worth of # releases anyway, so we just set timestamp to now. now = datetime.utcnow() - return [Pre(item.find("title").text, - item.find("guid").text, - now) - for item in soup] + return [Pre(item.find("title").text, item.find("guid").text, now) for item in soup] diff --git a/dailyreleases/reddit.py b/dailyreleases/reddit.py index 5e4869e..15489a1 100644 --- a/dailyreleases/reddit.py +++ b/dailyreleases/reddit.py @@ -24,8 +24,9 @@ def submit_post(title, text, subreddit) -> Submission: def get_previous_daily_post(subreddit) -> Submission: logger.info("Getting previous daily post from r/%s", subreddit) - posts = praw.subreddit(subreddit).search('title:"daily releases"', sort="new", syntax="lucene", - time_filter="week") + posts = praw.subreddit(subreddit).search( + 'title:"daily releases"', sort="new", syntax="lucene", time_filter="week" + ) return next( p for p in posts diff --git a/dailyreleases/stores/__init__.py b/dailyreleases/stores/__init__.py index d50d3bd..c9583cb 100644 --- a/dailyreleases/stores/__init__.py +++ b/dailyreleases/stores/__init__.py @@ -30,7 +30,7 @@ def find_store_links(game_name: str) -> Dict[str, str]: "bigfishgames.com/games": "Big Fish Games", "gamejolt.com": "Game Jolt", "alawar.com": "Alawar", - "wildtangent.com": "WildTangent Games" + "wildtangent.com": "WildTangent Games", } # Multiple store links are sometimes returned, but we believe in Google's algorithm and choose the first one diff --git a/dailyreleases/stores/gog.py b/dailyreleases/stores/gog.py index 80acf21..71b7612 100644 --- a/dailyreleases/stores/gog.py +++ b/dailyreleases/stores/gog.py @@ -10,17 +10,16 @@ logger = logging.getLogger(__name__) def search(query: str) -> Optional[str]: logger.debug("Searching GOG for %s", query) - r = cache.get("https://www.gog.com/games/ajax/filtered", params={ - "search": query, - "mediaType": "game", - "limit": 5 - }) - products = {p["title"]: p - for p in r.json["products"] - if p["isGame"]} + r = cache.get( + "https://www.gog.com/games/ajax/filtered", + params={"search": query, "mediaType": "game", "limit": 5}, + ) + products = {p["title"]: p for p in r.json["products"] if p["isGame"]} try: - best_match = products[util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90)[0]] + best_match = products[ + util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90)[0] + ] logger.debug("Best match is '%s'", best_match) return "https://gog.com{url}".format(**best_match) except IndexError: diff --git a/dailyreleases/stores/steam.py b/dailyreleases/stores/steam.py index 3863db8..2ca5a86 100644 --- a/dailyreleases/stores/steam.py +++ b/dailyreleases/stores/steam.py @@ -14,28 +14,32 @@ AppID = TypeVar("AppID", int, str) def appdetails(appid: AppID) -> dict: - r = cache.get("https://store.steampowered.com/api/appdetails", params={ - "appids": appid - }) + r = cache.get( + "https://store.steampowered.com/api/appdetails", params={"appids": appid} + ) return r.json[str(appid)]["data"] def packagedetails(appid: AppID) -> dict: - r = cache.get("https://store.steampowered.com/api/packagedetails", params={ - "packageids": appid - }) + r = cache.get( + "https://store.steampowered.com/api/packagedetails", + params={"packageids": appid}, + ) return r.json[str(appid)]["data"] def appreviews(appid: AppID) -> dict: - r = cache.get(f"https://store.steampowered.com/appreviews/{appid}", params={ - "start_date": -1, - "end_date": -1, - "filter": "summary", - "language": "all", - "purchase_type": "all", - "json": 1 - }) + r = cache.get( + f"https://store.steampowered.com/appreviews/{appid}", + params={ + "start_date": -1, + "end_date": -1, + "filter": "summary", + "language": "all", + "purchase_type": "all", + "json": 1, + }, + ) return r.json["query_summary"] @@ -59,26 +63,22 @@ def eula(appid: AppID) -> str: def search(query: str) -> Optional[str]: logger.debug("Searching Steam store for %s", query) - r = cache.get("https://store.steampowered.com/search/suggest", params={ - "term": query, - "f": "json", - "cc": "US", - "l": "english" - }) + r = cache.get( + "https://store.steampowered.com/search/suggest", + params={"term": query, "f": "json", "cc": "US", "l": "english"}, + ) # Reverse results to make the first one take precedence over later ones if multiple results have the same name. # E.g. "Wolfenstein II: The New Colossus" has both international and german version under the same name. items = {item["name"]: item for item in reversed(r.json)} try: - best_match = items[util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90)[0]] + best_match = items[ + util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90)[0] + ] logger.debug("Best match is '%s'", best_match) - type_to_slug = { - "game": "app", - "dlc": "app", - "bundle": "bundle" - } - slug = type_to_slug.get(best_match['type'], best_match['type']) + type_to_slug = {"game": "app", "dlc": "app", "bundle": "bundle"} + slug = type_to_slug.get(best_match["type"], best_match["type"]) return f"https://store.steampowered.com/{slug}/{best_match['id']}" except IndexError: logger.debug("Unable to find %s in Steam search results", query) @@ -91,7 +91,9 @@ def update_info(release: Release) -> None: link_type, appid = re.search("(app|sub|bundle)(?:/)([0-9]+)", link).groups() if link_type == "bundle": - logger.debug("Steam link is to bundle: not utilizing API") # Steam has no public API for bundles + logger.debug( + "Steam link is to bundle: not utilizing API" + ) # Steam has no public API for bundles return # If the link is a package on Steam (e.g. game + dlc), we need to find the base game of the package @@ -105,7 +107,9 @@ def update_info(release: Release) -> None: # We guesstimate the base game as the most popular app (i.e. the one with most reviews) among the first three package_appids = [app["id"] for app in package_details["apps"][:3]] package_apps_details = [appdetails(appid) for appid in package_appids] - details = max(package_apps_details, key=lambda app: reviews(app["steam_appid"])[1]) + details = max( + package_apps_details, key=lambda app: reviews(app["steam_appid"])[1] + ) appid = details["steam_appid"] # Otherwise, if the release is a single game on Steam @@ -123,6 +127,11 @@ def update_info(release: Release) -> None: release.type = "DLC" # Add highlight if "denuvo" occurs in Steam's DRM notice or potential 3rd-party EULA - if "denuvo" in details.get("drm_notice", "").lower() or "denuvo" in eula(appid).lower(): - logger.info("'denuvo' found in Steam DRM-notice/EULA; adding 'DENUVO' to highlights") + if ( + "denuvo" in details.get("drm_notice", "").lower() + or "denuvo" in eula(appid).lower() + ): + logger.info( + "'denuvo' found in Steam DRM-notice/EULA; adding 'DENUVO' to highlights" + ) release.highlights.append("DENUVO") diff --git a/dailyreleases/stores/web.py b/dailyreleases/stores/web.py index 4e6ae1f..69c147d 100644 --- a/dailyreleases/stores/web.py +++ b/dailyreleases/stores/web.py @@ -11,11 +11,14 @@ logger = logging.getLogger(__name__) def web_search(query: str) -> List[str]: logger.debug("Searching Google for %s", query) try: - r = cache.get("https://www.googleapis.com/customsearch/v1", params={ - "key": CONFIG["google"]["key"], - "cx": CONFIG["google"]["cx"], - "q": query - }) + r = cache.get( + "https://www.googleapis.com/customsearch/v1", + params={ + "key": CONFIG["google"]["key"], + "cx": CONFIG["google"]["cx"], + "q": query, + }, + ) return [result["link"] for result in r.json["items"]] except (KeyError, HTTPError) as e: logger.exception(e) diff --git a/dailyreleases/util.py b/dailyreleases/util.py index 6dda767..f0d5912 100644 --- a/dailyreleases/util.py +++ b/dailyreleases/util.py @@ -14,31 +14,35 @@ def humanize(n: int, precision=2, prefix="bin", suffix="B") -> str: """ abbrevs = { "dec": [ - (1000 ** 5, 'P' + suffix), - (1000 ** 4, 'T' + suffix), - (1000 ** 3, 'G' + suffix), - (1000 ** 2, 'M' + suffix), - (1000 ** 1, 'k' + suffix) + (1000 ** 5, "P" + suffix), + (1000 ** 4, "T" + suffix), + (1000 ** 3, "G" + suffix), + (1000 ** 2, "M" + suffix), + (1000 ** 1, "k" + suffix), ], "bin": [ - (1 << 50, 'Pi' + suffix), - (1 << 40, 'Ti' + suffix), - (1 << 30, 'Gi' + suffix), - (1 << 20, 'Mi' + suffix), - (1 << 10, 'ki' + suffix) - ] + (1 << 50, "Pi" + suffix), + (1 << 40, "Ti" + suffix), + (1 << 30, "Gi" + suffix), + (1 << 20, "Mi" + suffix), + (1 << 10, "ki" + suffix), + ], } factor, suffix = next(((f, s) for f, s in abbrevs[prefix] if n >= f), (1, suffix)) return "{1:.{0}f}".format(precision, n / factor).rstrip("0").rstrip(".") + suffix -def case_insensitive_close_matches(word: str, possibilities: Sequence[str], n=3, cutoff=0.6) -> List[str]: +def case_insensitive_close_matches( + word: str, possibilities: Sequence[str], n=3, cutoff=0.6 +) -> List[str]: """ Python's difflib.get_close_matches does case sensitive sequence matching, this function decorates the library function to make it case insensitive. """ possibilities = {sequence.lower(): sequence for sequence in possibilities} - close_matches = difflib.get_close_matches(word.lower(), possibilities, n=n, cutoff=cutoff) + close_matches = difflib.get_close_matches( + word.lower(), possibilities, n=n, cutoff=cutoff + ) return [possibilities[m] for m in close_matches] @@ -72,16 +76,21 @@ def retry(attempts=3, delay=0): """ Retry wrapped function `attempts` times. """ + def decorator(func): @wraps(func) def wrapper(*args, **kwargs): - for i in range(1, attempts+1): + for i in range(1, attempts + 1): try: return func(*args, **kwargs) except Exception as e: - logger.exception(f"{func.__name__} attempt {i}/{attempts}", exc_info=e) + logger.exception( + f"{func.__name__} attempt {i}/{attempts}", exc_info=e + ) if i >= attempts: raise time.sleep(delay) + return wrapper + return decorator diff --git a/setup.py b/setup.py index a335aba..d5a44c5 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,6 @@ # Always prefer setuptools over distutils from setuptools import setup, find_packages + # To use a consistent encoding from codecs import open from os import path @@ -18,9 +19,7 @@ setup( description="A reddit bot that consolidates scene releases", long_description=long_description, long_description_content_type="text/markdown", - project_urls={ - "Source": "https://git.caspervk.net/caspervk/dailyreleases.git" - }, + project_urls={"Source": "https://git.caspervk.net/caspervk/dailyreleases.git"}, author=__author__, classifiers=[ "Development Status :: 3 - Alpha", @@ -31,18 +30,7 @@ setup( license=__licence__, packages=find_packages(exclude=["tests"]), include_package_data=True, - package_data={ - "dailyreleases": [ - "*.default" - ] - }, - install_requires=[ - "praw==6.4.0", - "beautifulsoup4==4.7.1" - ], - entry_points={ - "console_scripts": [ - "dailyreleases = dailyreleases.main:main" - ] - }, -) \ No newline at end of file + package_data={"dailyreleases": ["*.default"]}, + install_requires=["praw==6.4.0", "beautifulsoup4==4.7.1"], + entry_points={"console_scripts": ["dailyreleases = dailyreleases.main:main"]}, +) diff --git a/tests/test_parse_dirname.py b/tests/test_parse_dirname.py index 067622d..068f460 100644 --- a/tests/test_parse_dirname.py +++ b/tests/test_parse_dirname.py @@ -22,17 +22,25 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertEqual([], r.highlights) def test_error_on_blacklisted_word(self): - pre = Pre("Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED", "nfo_link", datetime.utcnow()) + pre = Pre( + "Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED", + "nfo_link", + datetime.utcnow(), + ) with self.assertRaisesRegex(ParseError, "Contains blacklisted word"): parsing.parse_pre(pre) def test_error_on_old(self): - pre = Pre("Aztez-DARKSiDERS", "nfo_link", datetime.utcnow() - timedelta(hours=50)) + pre = Pre( + "Aztez-DARKSiDERS", "nfo_link", datetime.utcnow() - timedelta(hours=50) + ) with self.assertRaisesRegex(ParseError, "Older than 48 hours"): parsing.parse_pre(pre) def test_error_on_software(self): - pre = Pre("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link", datetime.utcnow()) + pre = Pre( + "Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link", datetime.utcnow() + ) with self.assertRaisesRegex(ParseError, "No store link: probably software"): parsing.parse_pre(pre) @@ -43,7 +51,11 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertEqual("Battlefield.1-CPY", r.dirname) def test_update(self): - pre = Pre("Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA", "nfo_link", datetime.utcnow()) + pre = Pre( + "Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) self.assertEqual(ReleaseType.UPDATE, r.type) self.assertIn("store.steampowered.com/app/754920", r.store_links["Steam"]) @@ -55,7 +67,9 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertIn("store.steampowered.com/app/705120", r.store_links["Steam"]) def test_macos_release(self): - pre = Pre("The_Fall_Part_2_Unbound_MacOS-Razor1911", "nfo_link", datetime.utcnow()) + pre = Pre( + "The_Fall_Part_2_Unbound_MacOS-Razor1911", "nfo_link", datetime.utcnow() + ) r = parsing.parse_pre(pre) self.assertEqual(Platform.OSX, r.platform) self.assertEqual(ReleaseType.GAME, r.type) @@ -63,7 +77,11 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertIn("gog.com/game/the_fall_part_2_unbound", r.store_links["GOG"]) def test_macosx_update(self): - pre = Pre("Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911", "nfo_link", datetime.utcnow()) + pre = Pre( + "Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) self.assertEqual(Platform.OSX, r.platform) self.assertEqual(ReleaseType.UPDATE, r.type) @@ -71,7 +89,9 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertIn("gog.com/game/man_o_war_corsair", r.store_links["GOG"]) def test_linux_release(self): - pre = Pre("Sphinx_And_The_Cursed_Mummy_Linux-Razor1911", "nfo_link", datetime.utcnow()) + pre = Pre( + "Sphinx_And_The_Cursed_Mummy_Linux-Razor1911", "nfo_link", datetime.utcnow() + ) r = parsing.parse_pre(pre) self.assertEqual(Platform.LINUX, r.platform) self.assertEqual(ReleaseType.GAME, r.type) @@ -91,7 +111,11 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertIn("store.steampowered.com/app/558244", r.store_links["Steam"]) def test_incl_dlc_update(self): - pre = Pre("Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX", "nfo_link", datetime.utcnow()) + pre = Pre( + "Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) self.assertEqual(ReleaseType.UPDATE, r.type) self.assertIn("store.steampowered.com/app/612880", r.store_links["Steam"]) @@ -113,13 +137,20 @@ class ParseDirnameTestCase(unittest.TestCase): def test_non_steam(self): pre = Pre("Battlefield.1.REPACK-CPY", "nfo_link", datetime.utcnow()) r = parsing.parse_pre(pre) - self.assertIn("www.origin.com/usa/en-us/store/battlefield/battlefield-1", r.store_links["Origin"]) + self.assertIn( + "www.origin.com/usa/en-us/store/battlefield/battlefield-1", + r.store_links["Origin"], + ) self.assertEqual(-1, r.score) self.assertEqual(-1, r.num_reviews) def test_gog_exclusive(self): # TODO: Actually use GOG API (gog.update_info) - pre = Pre("Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT", "nfo_link", datetime.utcnow()) + pre = Pre( + "Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) self.assertIn("gog.com/game/dungeons_dragons_dragonshard", r.store_links["GOG"]) self.assertEqual(-1, r.score) @@ -132,7 +163,9 @@ class ParseDirnameTestCase(unittest.TestCase): def test_epic_games_exclusive(self): pre = Pre("Journey-CODEX", "nfo_link", datetime.utcnow()) r = parsing.parse_pre(pre) - self.assertIn("epicgames.com/store/en-US/product/journey", r.store_links["Epic Games"]) + self.assertIn( + "epicgames.com/store/en-US/product/journey", r.store_links["Epic Games"] + ) def test_score_non_steam(self): pre = Pre("Ode.RIP.MULTI12-SiMPLEX", "nfo_link", datetime.utcnow()) @@ -140,7 +173,11 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertEqual(-1, r.score) def test_tags(self): - pre = Pre("The.Curious.Expedition.v1.3.7.1.MULTI.7.RIP-Unleashed", "nfo_link", datetime.utcnow()) + pre = Pre( + "The.Curious.Expedition.v1.3.7.1.MULTI.7.RIP-Unleashed", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) self.assertIn("gog.com/game/curious_expedition_the", r.store_links["GOG"]) self.assertEqual(["MULTI.7", "RIP"], r.tags) @@ -149,13 +186,21 @@ class ParseDirnameTestCase(unittest.TestCase): pre = Pre("Anno.2070.Complete.Edition-FAKE", "nfo_link", datetime.utcnow()) r = parsing.parse_pre(pre) self.assertEqual("Anno 2070 Complete Edition", r.game_name) - self.assertGreaterEqual(r.num_reviews, 9354) # make sure we got the right game from the package + self.assertGreaterEqual( + r.num_reviews, 9354 + ) # make sure we got the right game from the package self.assertIn("store.steampowered.com/sub/26683", r.store_links["Steam"]) def test_steam_package_with_dlc_first(self): - pre = Pre("The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED", "nfo_link", datetime.utcnow()) + pre = Pre( + "The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) - self.assertEqual("The Witcher 3: Wild Hunt - Game of the Year Edition", r.game_name) + self.assertEqual( + "The Witcher 3: Wild Hunt - Game of the Year Edition", r.game_name + ) self.assertEqual(ReleaseType.GAME, r.type) self.assertIn("store.steampowered.com/sub/124923", r.store_links["Steam"]) @@ -179,14 +224,22 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertEqual(["DENUVO"], r.highlights) def test_episode_release(self): - pre = Pre("Life.is.Strange.Before.the.Storm.Episode.3-CODEX", "nfo_link", datetime.utcnow()) + pre = Pre( + "Life.is.Strange.Before.the.Storm.Episode.3-CODEX", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) self.assertEqual("Life is Strange: Before the Storm Episode 3", r.game_name) self.assertEqual(ReleaseType.DLC, r.type) self.assertIn("store.steampowered.com/app/704740", r.store_links["Steam"]) def test_season_and_episode_release(self): - pre = Pre("Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED", "nfo_link", datetime.utcnow()) + pre = Pre( + "Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED", + "nfo_link", + datetime.utcnow(), + ) r = parsing.parse_pre(pre) self.assertEqual("Minecraft Story Mode Season Two Episode 5", r.game_name) @@ -216,5 +269,5 @@ class ParseDirnameTestCase(unittest.TestCase): self.assertEqual("GTA 5 The Complete Edition", r.game_name) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()