diff --git a/dailyreleases/__init__.py b/dailyreleases/__init__.py index f8421a7..c7a2c80 100644 --- a/dailyreleases/__init__.py +++ b/dailyreleases/__init__.py @@ -1,3 +1,3 @@ -__version__ = "0.0.2" +__version__ = "0.0.3" __author__ = "Casper V. Kristensen" __licence__ = "GPLv3" diff --git a/dailyreleases/cache.py b/dailyreleases/cache.py new file mode 100644 index 0000000..5785924 --- /dev/null +++ b/dailyreleases/cache.py @@ -0,0 +1,103 @@ +import json +import logging +import sqlite3 +import urllib.parse +import urllib.request +from datetime import timedelta, datetime +from http.client import HTTPResponse +from typing import Mapping +from urllib.request import Request, urlopen + +from .config import DATA_DIR, CONFIG + +logger = logging.getLogger(__name__) + + +class Response: + def __init__(self, response: HTTPResponse = None, bytes: bytes = None) -> None: + if response is not None: + self.bytes = response.read() + else: + self.bytes = bytes + self.text = self.bytes.decode() # TODO: Detect encoding + + @property + def json(self): + return json.loads(self.bytes) + + +connection = sqlite3.connect(DATA_DIR.joinpath("cache.sqlite")) +connection.row_factory = sqlite3.Row # allow accessing rows by index and case-insensitively by name +connection.text_factory = bytes # do not try to decode bytes as utf-8 strings + +cache_time = timedelta(seconds=CONFIG["web"].getint("cache_time")) +logger.info("Requests cache time is %s", cache_time) + +connection.executescript( + f""" + CREATE TABLE IF NOT EXISTS + requests (id INTEGER PRIMARY KEY, + url TEXT UNIQUE NOT NULL, + response BLOB NOT NULL, + timestamp INTEGER NOT NULL); + + DELETE FROM requests + WHERE timestamp < {(datetime.utcnow() - cache_time).timestamp()}; + + VACUUM; + """ +) + + +def get(url: str, params: Mapping = None, *args, **kwargs) -> Response: + if params is not None: + url += "?" + urllib.parse.urlencode(params) + request = Request(url, *args, **kwargs) + request.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0") + + row = connection.execute( + """ + SELECT response, timestamp + FROM requests + WHERE url = :url; + """, { + "url": url + } + ).fetchone() + + # Cache miss + if row is None: + response = Response(urlopen(request)) + connection.execute( + """ + INSERT INTO requests(url, response, timestamp) + VALUES (:url, :response, :timestamp); + """, { + "url": url, + "response": response.bytes, + "timestamp": datetime.utcnow().timestamp() + } + ) + connection.commit() + return response + + # Cached and fresh + if datetime.fromtimestamp(row["timestamp"]) > datetime.utcnow() - cache_time: + return Response(bytes=row["response"]) + + # Cached but stale + response = Response(urlopen(request)) + connection.execute( + """ + UPDATE requests + SET response = :response, + timestamp = :timestamp + WHERE url = :url; + """, { + "url": url, + "response": response.bytes, + "timestamp": datetime.utcnow().timestamp() + } + ) + connection.commit() + return response diff --git a/dailyreleases/config.ini.default b/dailyreleases/config.ini.default index 609306e..d1c5908 100644 --- a/dailyreleases/config.ini.default +++ b/dailyreleases/config.ini.default @@ -9,7 +9,6 @@ mode = test [logging] level = DEBUG -file = logs/main.log backup_count = 10 diff --git a/dailyreleases/config.py b/dailyreleases/config.py index 09420b2..45d97f3 100644 --- a/dailyreleases/config.py +++ b/dailyreleases/config.py @@ -1,7 +1,6 @@ import configparser import logging import logging.config -import os import shutil from pathlib import Path @@ -18,8 +17,7 @@ def read_config() -> configparser.ConfigParser: Read and return config file. Copies default config template to data dir if it doesn't already exists. """ if not CONFIG_FILE.exists(): - print("Copying default configuration file..") - os.makedirs(DATA_DIR, exist_ok=True) + DATA_DIR.mkdir(exist_ok=True) shutil.copyfile(DEFAULT_CONFIG_FILE, CONFIG_FILE) print("Please customize", CONFIG_FILE) @@ -31,19 +29,8 @@ def read_config() -> configparser.ConfigParser: return config -def initialize_logging(config: configparser.ConfigParser): - """ - Set up logging. - """ - log_file = DATA_DIR.joinpath(config["logging"]["file"]) - log_level = config["logging"]["level"] - log_backup_count = config["logging"].getint("backup_count") +CONFIG = read_config() - os.makedirs(log_file.parent, exist_ok=True) - logging.config.dictConfig(logging_config(log_file, log_level, log_backup_count)) - - logger.info("Logging level is %s", log_level) - logger.info("Logging to %s - backup count is %s", log_file, log_backup_count) def logging_config(file, level, backup_count) -> dict: return { @@ -65,7 +52,7 @@ def logging_config(file, level, backup_count) -> dict: "class": "logging.handlers.TimedRotatingFileHandler", "when": "midnight", "backupCount": backup_count, - "filename": DATA_DIR.joinpath(file), + "filename": file, "encoding": "utf-8", "formatter": "standard", "level": level @@ -81,3 +68,21 @@ def logging_config(file, level, backup_count) -> dict: "level": "WARNING" } } + + +def initialize_logging(config: configparser.ConfigParser = CONFIG) -> None: + """ + Set up logging. + """ + file = DATA_DIR.joinpath("logs/main.log") + level = config["logging"]["level"] + backup_count = config["logging"].getint("backup_count") + + file.parent.mkdir(exist_ok=True) + logging.config.dictConfig(logging_config(file, level, backup_count)) + + logger.info("Logging level is %s", level) + logger.info("Logging to %s - backup count is %s", file, backup_count) + + +initialize_logging() diff --git a/dailyreleases/gog.py b/dailyreleases/gog.py deleted file mode 100644 index 0545640..0000000 --- a/dailyreleases/gog.py +++ /dev/null @@ -1,29 +0,0 @@ -import logging - -from dailyreleases import util - -logger = logging.getLogger(__name__) - - -class GOG(object): - def __init__(self, cache) -> None: - self.cache = cache - - def search(self, query): - logger.debug("Searching GOG for %s", query) - payload = { - "limit": 5, - "search": query - } - products = {p["title"]: p - for p in self.cache.get("https://www.gog.com/games/ajax/filtered", params=payload).json()["products"] - if p["isGame"]} - - best_match = util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90) - - if not best_match: - logger.debug("Unable to find %s in GOG search results", query) - return - - logger.debug("Best match is '%s'", best_match[0]) - return "https://gog.com{url}".format(**products[best_match[0]]) diff --git a/dailyreleases/main.py b/dailyreleases/main.py index 7a502b3..afaeda5 100644 --- a/dailyreleases/main.py +++ b/dailyreleases/main.py @@ -1,445 +1,195 @@ import inspect -import json import logging import re -import string import textwrap import time from collections import defaultdict from datetime import datetime, timedelta +from typing import Set import prawcore -import requests_cache -from dailyreleases import config, __version__, util -from dailyreleases.gog import GOG -from dailyreleases.predb import Predb -from dailyreleases.reddit import Reddit -from dailyreleases.steam import Steam -from dailyreleases.web import Web +from . import __version__, util, reddit, predbs, parsing +from .config import CONFIG, DATA_DIR +from .parsing import ParsedReleases, Release, ReleaseType logger = logging.getLogger(__name__) -class DailyReleasesBot(object): - def __init__(self) -> None: - print(f"Starting Daily Releases Bot v{__version__}") +def listen_inbox() -> None: + logger.info("Listening on reddit inbox stream") + authorized_users = CONFIG["reddit"]["authorized_users"].split(",") - # Load config file and initialize logging based on its contents - self.config = config.read_config() - config.initialize_logging(self.config) - - # Setup caching of requests; helps reduce the number of requests if the same game gets multiple releases - cache_time = self.config["web"].getint("cache_time") - self.cache = requests_cache.core.CachedSession(cache_name=str(config.DATA_DIR.joinpath("cache")), - expire_after=cache_time) - logger.info("Requests cache time is %ss", cache_time) - - # Initialize sub-modules - self.web = Web(self.config, self.cache) - self.predb = Predb(self.cache) - self.steam = Steam(self.cache) - self.gog = GOG(self.cache) - self.reddit = Reddit(self.config) - - def run(self): - mode = self.config["main"]["mode"] - logger.info("Mode is %s", mode) - - if mode == "test": - self.generate(post=False) - - if mode == "immediately": - self.generate(post=True, pm_recipients=self.config["reddit"]["notify_users"].split(",")) - - if mode == "reply": - self.listen_inbox() - - def listen_inbox(self): - logger.info("Listening on reddit inbox stream") - authorized_users = self.config["reddit"]["authorized_users"].split(",") - - while True: - try: - for message in self.reddit.praw.inbox.stream(): - if message.author in authorized_users: - self.generate(post=True, pm_recipients=(message.author.name,)) - else: - logger.info("Discarding PM from %s: not authorized user", message.author) - - message.mark_read() # mark message read last so we can retry after potential fatal errors - - except prawcore.PrawcoreException as e: - logger.warning("PrawcoreException: %s", e) - logger.info("Restarting inbox listener..") - - except KeyboardInterrupt: - print("Exiting (KeyboardInterrupt)") - break - - def find_store_links(self, game_name) -> dict: - links = {} - - # Steam - steam_link = self.steam.search(game_name) - if steam_link: - links["Steam"] = steam_link - - # GOG - gog_link = self.gog.search(game_name) - if gog_link: - links["GOG"] = f"{gog_link} 'DRM-Free! 👍'" # hover text - - if links: - return links - - # If none of those worked, try Googling the game - known_stores = { - "store.steampowered.com/(app|sub|bundle)": "Steam", # order doesn't matter - "gog.com/game": "GOG", - "origin.com": "Origin", - "ubi(soft)?.com": "Ubisoft", - "www.microsoft.com/.*p": "Microsoft Store", - "itch.io": "Itch.io", - "bigfishgames.com": "Big Fish Games", - "gamejolt.com": "Game Jolt", - "alawar.com": "Alawar", - "wildtangent.com": "WildTangent Games" - } - - # Multiple store links are sometimes returned, but we believe in Google's algorithm and choose the first one - for link in self.web.search(f"{game_name} buy"): - for store_url, store_name in known_stores.items(): - if re.search(store_url, link, flags=re.IGNORECASE): - return {store_name: link} - - logger.debug("Unable to find store links for %s", game_name) - return {} - - def parse_dirname(self, dirname): - logger.info("---") - logger.info("Parsing: %s", dirname) - - # Extract group name - rls_name, group = dirname.rsplit("-", 1) - - # Find game name by matching until one of the stopwords - game_name, *stopwords = re.split("[._-](update|v[0-9]+|Crack[._-]?fix|mac[._-]?os[._-]?x?|linux|MULTI[0-9]+|RIP" - "|GOG|PROPER|REPACK|Multilanguage|incl|Standalone|x(?:86|64)" - "|(?:86|64)[._-]?bit|German|CZECH|RUSSIAN|KOREAN|ITALIAN|SWEDISH|DANISH|French" - "|Slovak|DIR[._-]?FIX|build[._-]?[0-9]+|READNFO|Hotfix|DLC[._-]?Unlocker" - "|iNTERNAL|Steam[._-]?Edition)", - rls_name, flags=re.IGNORECASE) - - # Prettify game name by substituting word delimiters with spaces and capitalizing each word. Delimiters - # separated by fewer than two letters are not substituted, to allow titles like "R.O.V.E.R." - game_name = string.capwords(re.sub("[._-]([a-zA-Z]{2,}|[0-9]+)", " \g<1>", game_name)) - - # Some stopwords are interesting enough to add next to the game in parentheses - we call these tags - tags = [stopword - for stopword in stopwords - if re.match("Crack[._-]?fix|MULTI[0-9]+|RIP|REPACK|x(?:86|64)|(?:86|64)[._-]?bit|German|CZECH|RUSSIAN" - "|KOREAN|ITALIAN|SWEDISH|DANISH|French|Slovak|Hotfix|DIR[._-]?FIX", - stopword, flags=re.IGNORECASE)] - - # Some stopwords are even more interesting and deserve to be highlighted (e.g. '- PROPER' next to the name) - highlights = [highlight - for highlight in stopwords - if re.match("PROPER|READNFO", highlight, flags=re.IGNORECASE)] - - # Find platform - if re.search("mac[._-]?os[._-]?x?", rls_name, flags=re.IGNORECASE): - platform = "Mac OSX" - elif re.search("linux", rls_name, flags=re.IGNORECASE): - platform = "Linux" - else: - platform = "Windows" - - # Find type (game/dlc/update) - # Order of the if-statements is important: Update trumps DLC because an update to a DLC is an update, not a DLC! - rls_type = "Game" - if re.search("(? str: + post = [] + for platform, platform_releases in parsed_releases.items(): + if sum(len(pr) for pr in platform_releases.values()) == 0: + continue - def generate(self, post=False, pm_recipients=None): - logger.info("-------------------------------------------------------------------------------------------------") - start_time = time.time() + post.append(f"# {platform}") + for release_type, releases in platform_releases.items(): + if not releases: + continue - already_posted = self.load_already_posted() + # Releases in the tables are grouped by release group, and the groups are ordered according to the most + # popular game within the group. Games are sorted by popularity internally in the groups as well. + def popularity(release: Release): + # The popularity of a game is defined by the number of reviews it has on Steam, however, we rank RIPs + # lower than non-RIPs so the same game released as both will sort the non-RIP first. + is_rip = "RIP" in [tag.upper() for tag in release.tags] + return release.num_reviews, not is_rip - releases = self.predb.get_releases() - parsed_releases, failed_dirnames = self.parse_releases(releases, already_posted) + group_order = defaultdict(lambda: (-1, False)) + for release in releases: + group_order[release.group] = max(group_order[release.group], popularity(release)) - # The date of the post changes at midday instead of midnight to allow calling script after 00:00 - title = "Daily Releases ({})".format((datetime.today() - timedelta(hours=12)).strftime("%B %-d, %Y")) + def order(release: Release): + return (group_order[release.group], + release.group, # ensure grouping if two groups share group_order + popularity(release)) - generated_post = self.generate_post(parsed_releases) - generated_post_src = textwrap.indent(generated_post, " ") + def row(release: Release): + # The rows in the table containing updates will use the full rls_name as the name, while tables + # containing game and DLC releases will show tags and highlights, as well as the stylized game_name. + if release_type == ReleaseType.UPDATE: + name = f"[{release.rls_name}]({release.nfo_link})" + else: + tags = " ({})".format(" ".join(release.tags)) if release.tags else "" + highlights = " **- {}**".format(", ".join(release.highlights)) if release.highlights else "" + name = "[{}{}]({}){}".format(util.markdown_escape(release.game_name), + tags, + release.nfo_link, + highlights) - if post: - # Post to bot's own subreddit - bot_subreddit = self.config["reddit"]["bot_subreddit"] - reddit_src_post = self.reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit) - reddit_post = self.reddit.submit_post(title, generated_post, bot_subreddit) + if release.score == -1: + reviews = "-" + else: + num_reviews_humanized = util.humanize(release.num_reviews, precision=1, prefix="dec", suffix="") + reviews = f"{release.score:.0%} ({num_reviews_humanized})" - # Manually approve posts since reddit seem to think posts with many links are spam - reddit_src_post.mod.approve() - reddit_post.mod.approve() + stores = ", ".join(f"[{name}]({link})" for name, link in release.store_links.items()) - self.save_already_posted(already_posted) + return name, release.group, stores, reviews + + post.append(f"| {release_type} | Group | Store | Score (Reviews) |") + post.append("|:-|:-|:-|:-|") + post.extend("| {} | {} | {} | {} |".format(*row(rls)) for rls in sorted(releases, key=order, reverse=True)) + + post.append("") + post.append(" ") + post.append("") + + post.append("") + post.append("") + + if not post: + logger.warning("Post is empty!") + post.append("No releases today! :o") + + # Add link to the previous release thread + previous_post = reddit.get_previous_daily_post(CONFIG["reddit"]["posts_subreddit"]) + previous_post_date = re.search("daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE).group(1) + post.append(f"# [<< {previous_post_date}]({previous_post.url})") + + # Add epilogue + try: + with DATA_DIR.joinpath("epilogue.txt").open() as file: + post.extend(line.rstrip() for line in file.readlines()) + except FileNotFoundError: + logger.info("No epilogue.txt") + + # Convert post list to string + post_str = "\n".join(post) + + logger.debug("Generated post:\n%s", post_str) + return post_str + + +def generate(post=False, pm_recipients=None) -> None: + logger.info("-------------------------------------------------------------------------------------------------") + start_time = time.time() + + already_posted = load_already_posted() + + releases = predbs.get_releases() + # Remove old releases from already_posted to save space + already_posted.intersection_update(releases) + + parsed_releases = parsing.parse_releases(releases, already_posted) + + # The date of the post changes at midday instead of midnight to allow calling script after 00:00 + title = f"Daily Releases ({(datetime.today() - timedelta(hours=12)).strftime('%B %-d, %Y')})" + + generated_post = generate_post(parsed_releases) + generated_post_src = textwrap.indent(generated_post, " ") + + if post: + # Post to bot's own subreddit + bot_subreddit = CONFIG["reddit"]["bot_subreddit"] + reddit_src_post = reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit) + reddit_post = reddit.submit_post(title, generated_post, bot_subreddit) + + # Manually approve posts since reddit seem to think posts with many links are spam + reddit_src_post.mod.approve() + reddit_post.mod.approve() + + save_already_posted(already_posted) if pm_recipients is not None: msg = inspect.cleandoc( f""" [Preview]({reddit_post.url}) [Source]({reddit_src_post.url}) - Failed: {", ".join(failed_dirnames)} """ ) for recipient in pm_recipients: - self.reddit.send_pm(recipient, title, msg) + reddit.send_pm(recipient, title, msg) - logger.info("Execution took %s seconds", int(time.time() - start_time)) - logger.info("-------------------------------------------------------------------------------------------------") - - # Clean requests cache after each successful generation so it doesn't grow indefinitely - self.cache.remove_expired_responses() - - def load_already_posted(self): - try: - with config.DATA_DIR.joinpath("already_posted").open() as file: - return set(line.rstrip() for line in file.readlines()) - except FileNotFoundError: - return set() - - def save_already_posted(self, already_posted): - logger.info("Saving already posted to file") - with config.DATA_DIR.joinpath("already_posted").open("w") as file: - for dirname in already_posted: - file.write("{}\n".format(dirname)) + logger.info("Execution took %s seconds", int(time.time() - start_time)) + logger.info("-------------------------------------------------------------------------------------------------") -def main(): - bot = DailyReleasesBot() - bot.run() +def load_already_posted() -> Set[str]: + try: + with DATA_DIR.joinpath("already_posted").open() as file: + return {line.rstrip() for line in file.readlines()} + except FileNotFoundError: + return set() + + +def save_already_posted(already_posted) -> None: + logger.info("Saving already posted to file") + with DATA_DIR.joinpath("already_posted").open("w") as file: + file.writelines(already_posted) + + +def main() -> None: + print(f"Starting Daily Releases Bot v{__version__}") + mode = CONFIG["main"]["mode"] + logger.info("Mode is %s", mode) + + if mode == "test": + generate(post=False) + if mode == "immediately": + generate(post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(",")) + if mode == "reply": + listen_inbox() if __name__ == '__main__': diff --git a/dailyreleases/parsing.py b/dailyreleases/parsing.py new file mode 100644 index 0000000..b5a49cb --- /dev/null +++ b/dailyreleases/parsing.py @@ -0,0 +1,227 @@ +import logging +import re +import string +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from enum import Enum +from typing import Optional, List, Set, Dict, Iterable + +from . import stores +from .predbs import PredbRelease +from .stores import steam + +logger = logging.getLogger(__name__) + + +class ReleaseType(str, Enum): + GAME = "Game" + UPDATE = "Update" + DLC = "DLC" + + def __str__(self) -> str: + return self.value + + +class Platform(str, Enum): + WINDOWS = "Windows" + OSX = "Mac OSX" + LINUX = "Linux" + + def __str__(self) -> str: + return self.value + + +@dataclass +class Release: + dirname: str + rls_name: str # dirname without group + group: str + game_name: str + type: ReleaseType + platform: Platform + nfo_link: str + store_links: Dict[str, str] = field(default_factory=dict) + tags: List[str] = field(default_factory=list) + highlights: List[str] = field(default_factory=list) + score: int = -1 # score and number of reviews is -1 by default; it is updated if the game exists on Steam + num_reviews: int = -1 + + +STOPWORDS = ( + "update", + "v[0-9]+", + "build[._-]?[0-9]+", + "iNTERNAL", + "incl", + "Standalone", + "Multilanguage", + "DLC", + "DLC[._-]?Unlocker", + "Steam[._-]?Edition", + "GOG", + "mac[._-]?os[._-]?x?", + "linux", +) + +TAGS = ( + "Hotfix", + "Crack[._-]?fix", + "Dir[._-]?fix", + "MULTI[._-]?[0-9]+", + "x(?:86|64)", + "(?:86|64)[._-]?bit", + "RIP", + "REPACK", + "German", + "Czech", + "Russian", + "Korean", + "Italian", + "Swedish", + "Danish", + "French", + "Slovak", +) + +HIGHLIGHTS = ( + "PROPER", + "READNFO", +) + +BLACKLISTED = ( + "Keygen", + "Keymaker", + "[._-]3DS", + "[._-]NSW", + "[._-]PS4", + "[._-]PSP", + "[._-]Wii", + "[._-]WiiU", + "x264", + "720p", + "1080p", + "eBook", + "TUTORIAL", + "Debian", + "Ubuntu", + "Fedora", + "openSUSE", + "jQuery", + "CSS" + "ASP[._-]NET", + "Windows[._-]Server", + "Lynda", + "OREILLY" + "Wintellectnow", + "3ds[._-]?Max", + "For[._-]Maya", + "Cinema4D", +) + + +def parse_dirname(dirname: str, nfo_link: str) -> Optional[Release]: + logger.info("---") + logger.info("Parsing: %s", dirname) + + # Extract group name + rls_name, group = dirname.rsplit("-", maxsplit=1) + + # Find game name by matching until one of the stopwords + game_name, *stopwords = re.split("[._-]({})".format("|".join(STOPWORDS + TAGS + HIGHLIGHTS)), + rls_name, flags=re.IGNORECASE) + + # Prettify game name by substituting word delimiters with spaces and capitalizing each word. + game_name = string.capwords(re.sub("[_-]", " ", game_name)) + # Dots separated by fewer than two letters are not substituted to allow titles like "R.O.V.E.R." + game_name = string.capwords(re.sub("[.]([a-zA-Z]{2,}|[0-9]+)", " \g<1>", game_name)) + + # Some stopwords distinguishes two, otherwise identical, releases (e.g. x86/x64) - we call these tags + tags = [stopword + for stopword in stopwords + if re.match("|".join(TAGS), stopword, flags=re.IGNORECASE)] + + # Some stopwords signify an important piece of information and deserve to be highlighted (e.g. PROPER) + highlights = [stopword + for stopword in stopwords + if re.match("|".join(HIGHLIGHTS), stopword, flags=re.IGNORECASE)] + + # Find platform + if re.search("mac[._-]?os[._-]?x?", rls_name, flags=re.IGNORECASE): + platform = Platform.OSX + elif re.search("linux", rls_name, flags=re.IGNORECASE): + platform = Platform.LINUX + else: + platform = Platform.WINDOWS + + # Find release type (Game/DLC/Update) + # Order of the if-statements is important: Update trumps DLC because an update to a DLC is an update, not a DLC! + if re.search("update|v[0-9]|addon|Crack[._-]?fix|DIR[._-]?FIX|build[._-]?[0-9]+", rls_name, flags=re.IGNORECASE): + rls_type = ReleaseType.UPDATE + elif re.search("(? ParsedReleases: + parsed_releases = {platform: {release_type: [] for release_type in ReleaseType} + for platform in Platform} # {Windows: {Game: [..], DLC: [..], ..}, Linux: ...} + + for release in releases: + if release.dirname in already_posted: + logger.info("Skipping %s: dirname in already posted", release.dirname) + continue + + if re.search("|".join(BLACKLISTED), release.dirname, flags=re.IGNORECASE): + logger.info("Skipping %s: contains blacklisted word", release.dirname) + continue + + if release.timestamp < datetime.now() - timedelta(hours=48): + logger.info("Skipping %s: older than 48 hours (but not in already_posted!?)", release.dirname) + continue + + release = parse_dirname(release.dirname, release.nfo_link) + if not release: + continue # skip if there is no data about the release (e.g. if it is deemed a non-game by parse_dirname) + + # Add release to dict of parsed releases by platform and type + parsed_releases[release.platform][release.type].append(release) + already_posted.add(release.dirname) + + logger.debug("Parsed releases: %s", parsed_releases) + return parsed_releases diff --git a/dailyreleases/predb.py b/dailyreleases/predb.py deleted file mode 100644 index cc9a4a5..0000000 --- a/dailyreleases/predb.py +++ /dev/null @@ -1,63 +0,0 @@ -import logging -from datetime import datetime - -import requests -from bs4 import BeautifulSoup - -logger = logging.getLogger(__name__) - - -class Predb(object): - def __init__(self, cache) -> None: - self.cache = cache - - def get_releases(self): - logger.info("Getting releases from predbs") - - releases = {} - for db_releases in (self._get_predbme, self._get_xrel): # in reverse order of preference - try: - releases.update(db_releases()) - except requests.exceptions.ConnectionError as e: - logger.error(e) - logger.warning("Connection to predb failed, skipping..") - - return releases - - def _get_xrel(self, categories=("CRACKED", "UPDATE"), num_pages=2): - logger.debug("Getting releases from xrel.to") - - def get_releases_in_category(category, page): - payload = { - "category_name": category, - "ext_info_type": "game", - "per_page": 100, - "page": page - } - r = self.cache.get("https://api.xrel.to/v2/release/browse_category.json", params=payload) - return r.json()["list"] - - return {rls["dirname"]: (rls["link_href"], datetime.fromtimestamp(rls["time"])) - for category in categories - for page in range(1, num_pages) - for rls in get_releases_in_category(category, page)} - - def _get_srrdb(self, num_pages=3): - logger.debug("Getting releases from srrdb.com") - - return {rls["release"]: ("https://www.srrdb.com/release/details/{}".format(rls['release']), - datetime.strptime(rls["date"], "%Y-%m-%d %H:%M:%S")) - for p in range(1, num_pages) - for rls in - self.cache.get(f"https://www.srrdb.com/api/search/category:pc/order:date-desc/{p}").json()["results"]} - - def _get_predbme(self): - logger.debug("Getting releases from predb.me") - - r = self.cache.get("https://predb.me/?cats=games-pc&rss=1") - soup = BeautifulSoup(r.text, "html.parser").find_all("item") - - # Predb.me doesn't show timestamps in the RSS-feed, but the feed is so short it only shows ~72 hours worth of - # releases anyway, so we just set timestamp to now. - return {item.find("title").text: (item.find("guid").text, datetime.utcnow()) - for item in soup} diff --git a/dailyreleases/predbs.py b/dailyreleases/predbs.py new file mode 100644 index 0000000..d0e5b31 --- /dev/null +++ b/dailyreleases/predbs.py @@ -0,0 +1,64 @@ +import logging +from datetime import datetime +from typing import NamedTuple, List +from urllib.error import HTTPError + +from bs4 import BeautifulSoup + +from . import cache + +logger = logging.getLogger(__name__) + + +class PredbRelease(NamedTuple): + dirname: str + nfo_link: str + timestamp: datetime + + +def get_releases() -> List[PredbRelease]: + logger.info("Getting releases from predbs") + + releases = {} + for db_releases in (get_predbme, get_xrel): # in reverse order of preference + try: + releases.update((r.dirname, r) for r in db_releases()) # override duplicate dirnames in later iterations + except HTTPError as e: + logger.error(e) + logger.warning("Connection to predb failed, skipping..") + + return list(releases.values()) + + +def get_xrel(categories=("CRACKED", "UPDATE"), num_pages=2) -> List[PredbRelease]: + logger.debug("Getting releases from xrel.to") + + def get_releases_in_category(category, page): + r = cache.get("https://api.xrel.to/v2/release/browse_category.json", params={ + "category_name": category, + "ext_info_type": "game", + "per_page": 100, + "page": page + }) + return r.json["list"] + + return [PredbRelease(rls["dirname"], + rls["link_href"], + datetime.fromtimestamp(rls["time"])) + for category in categories + for page in range(1, num_pages) + for rls in get_releases_in_category(category, page)] + + +def get_predbme() -> List[PredbRelease]: + logger.debug("Getting releases from predb.me") + + rss = cache.get("https://predb.me/?cats=games-pc&rss=1") + soup = BeautifulSoup(rss.text, "html.parser").find_all("item") + + # Predb.me doesn't show timestamps in the RSS-feed, but the feed is so short it only shows ~72 hours worth of + # releases anyway, so we just set timestamp to now. + return [PredbRelease(item.find("title").text, + item.find("guid").text, + datetime.utcnow()) + for item in soup] diff --git a/dailyreleases/reddit.py b/dailyreleases/reddit.py index 83e9d8d..1ae2e4c 100644 --- a/dailyreleases/reddit.py +++ b/dailyreleases/reddit.py @@ -1,25 +1,26 @@ import logging import praw -import praw.models.reddit.submission +from praw.models import Submission + +from .config import CONFIG logger = logging.getLogger(__name__) -class Reddit(object): - def __init__(self, config) -> None: - self.config = config +praw = praw.Reddit(**CONFIG["reddit"]) - logger.info("Logging in to reddit") - self.praw = praw.Reddit(**self.config["reddit"]) - def send_pm(self, recipient, title, text): - logger.info("Sending PM to u/%s", recipient) - return self.praw.redditor(recipient).message(title, text) +def send_pm(recipient, title, text) -> None: + logger.info("Sending PM to u/%s", recipient) + return praw.redditor(recipient).message(title, text) - def submit_post(self, title, text, subreddit): - logger.info("Submitting post to r/%s", subreddit) - return self.praw.subreddit(subreddit).submit(title, text) - def get_previous_daily_post(self, subreddit) -> praw.models.reddit.submission.Submission: - return next(self.praw.subreddit(subreddit).search("daily release", sort="new", time_filter="week")) +def submit_post(title, text, subreddit) -> Submission: + logger.info("Submitting post to r/%s", subreddit) + return praw.subreddit(subreddit).submit(title, text) + + +def get_previous_daily_post(subreddit) -> Submission: + logger.info("Getting previous daily post from r/%s", subreddit) + return next(praw.subreddit(subreddit).search("daily release", sort="new", time_filter="week")) diff --git a/dailyreleases/steam.py b/dailyreleases/steam.py deleted file mode 100644 index 8bfaf56..0000000 --- a/dailyreleases/steam.py +++ /dev/null @@ -1,75 +0,0 @@ -import logging -from collections import namedtuple - -from bs4 import BeautifulSoup - -from dailyreleases import util - -logger = logging.getLogger(__name__) - - -class Steam(object): - def __init__(self, cache) -> None: - self.cache = cache - - def appdetails(self, appid): - payload = { - "appids": appid - } - r = self.cache.get("https://store.steampowered.com/api/appdetails", params=payload) - return r.json()[appid]["data"] - - def packagedetails(self, appid): - payload = { - "packageids": appid - } - r = self.cache.get("https://store.steampowered.com/api/packagedetails", params=payload) - return r.json()[appid]["data"] - - def appreviews(self, appid): - payload = { - "start_date": -1, - "end_date": -1, - "filter": "summary", - "language": "all", - "purchase_type": "all", - "json": 1 - } - r = self.cache.get(f"https://store.steampowered.com/appreviews/{appid}", params=payload) - return r.json()["query_summary"] - - def reviews(self, appid): - app_review = self.appreviews(appid) - Reviews = namedtuple("Reviews", ("score", "num")) - - if app_review["total_reviews"] == 0: - return Reviews(-1, -1) - - positive = app_review["total_positive"] / app_review["total_reviews"] - return Reviews(positive, app_review["total_reviews"]) - - def eula(self, appid): - r = self.cache.get(f"https://store.steampowered.com//eula/{appid}_eula_0") - soup = BeautifulSoup(r.text, "html.parser").find(id="eula_content") - if soup is not None: - return soup.text - return "" - - def search(self, query): - logger.debug("Searching Steam store for %s", query) - payload = { - "term": query - } - # Reverse results to make the first one take precedence over later ones if multiple results have the same name. - # E.g. "Wolfenstein II: The New Colossus" has both international and german version under the same name. - items = {i["name"]: i for i in reversed(self.cache.get("https://store.steampowered.com/api/storesearch", - params=payload).json()["items"])} - - best_match = util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90) - - if not best_match: - logger.debug("Unable to find %s in Steam search results", query) - return - - logger.debug("Best match is '%s'", best_match[0]) - return "https://store.steampowered.com/{type}/{id}".format(**items[best_match[0]]) diff --git a/dailyreleases/stores/__init__.py b/dailyreleases/stores/__init__.py new file mode 100644 index 0000000..92410b0 --- /dev/null +++ b/dailyreleases/stores/__init__.py @@ -0,0 +1,59 @@ +import logging +import re +from typing import Dict, List +from urllib.error import HTTPError + +from .. import cache +from ..config import CONFIG +from ..stores import steam, gog + +logger = logging.getLogger(__name__) + + +def web_search(query: str) -> List[str]: + logger.debug("Searching Google for %s", query) + try: + r = cache.get("https://www.googleapis.com/customsearch/v1", params={ + "key": CONFIG["google"]["key"], + "cx": CONFIG["google"]["cx"], + "q": query + }) + return [result["link"] for result in r.json["items"]] + except (KeyError, HTTPError) as e: + logger.exception(e) + logger.warning("Google search failed (probably rate-limited)") + return [] + + +def find_store_links(game_name: str) -> Dict[str, str]: + links = {} + for store, name in ((steam, "Steam"), (gog, "GOG")): + link = store.search(game_name) + if link is not None: + links[name] = link + + if links: + return links + + # If none of those worked, try Googling the game + known_stores = { + "store.steampowered.com/(app|sub|bundle)": "Steam", # order doesn't matter + "gog.com/game": "GOG", + "origin.com": "Origin", + "ubi(soft)?.com": "Ubisoft", + "www.microsoft.com/.*p": "Microsoft Store", + "itch.io": "Itch.io", + "bigfishgames.com/games": "Big Fish Games", + "gamejolt.com": "Game Jolt", + "alawar.com": "Alawar", + "wildtangent.com": "WildTangent Games" + } + + # Multiple store links are sometimes returned, but we believe in Google's algorithm and choose the first one + for link in web_search(f"{game_name} buy"): + for store_url, store_name in known_stores.items(): + if re.search(store_url, link, flags=re.IGNORECASE): + return {store_name: link} + + logger.debug("Unable to find store links for %s", game_name) + return {} diff --git a/dailyreleases/stores/gog.py b/dailyreleases/stores/gog.py new file mode 100644 index 0000000..80acf21 --- /dev/null +++ b/dailyreleases/stores/gog.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import logging +from typing import Optional + +from .. import util, cache + +logger = logging.getLogger(__name__) + + +def search(query: str) -> Optional[str]: + logger.debug("Searching GOG for %s", query) + r = cache.get("https://www.gog.com/games/ajax/filtered", params={ + "search": query, + "mediaType": "game", + "limit": 5 + }) + products = {p["title"]: p + for p in r.json["products"] + if p["isGame"]} + + try: + best_match = products[util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90)[0]] + logger.debug("Best match is '%s'", best_match) + return "https://gog.com{url}".format(**best_match) + except IndexError: + logger.debug("Unable to find %s in GOG search results", query) + return None diff --git a/dailyreleases/stores/steam.py b/dailyreleases/stores/steam.py new file mode 100644 index 0000000..98f87b8 --- /dev/null +++ b/dailyreleases/stores/steam.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import logging +import re +from typing import TypeVar, Optional, Tuple + +from bs4 import BeautifulSoup + +from .. import util, cache + +logger = logging.getLogger(__name__) + +AppID = TypeVar("AppID", int, str) + + +def appdetails(appid: AppID) -> dict: + r = cache.get("https://store.steampowered.com/api/appdetails", params={ + "appids": appid + }) + return r.json[str(appid)]["data"] + + +def packagedetails(appid: AppID) -> dict: + r = cache.get("https://store.steampowered.com/api/packagedetails", params={ + "packageids": appid + }) + return r.json[str(appid)]["data"] + + +def appreviews(appid: AppID) -> dict: + r = cache.get(f"https://store.steampowered.com/appreviews/{appid}", params={ + "start_date": -1, + "end_date": -1, + "filter": "summary", + "language": "all", + "purchase_type": "all", + "json": 1 + }) + return r.json["query_summary"] + + +def reviews(appid: AppID) -> Tuple[int, int]: + app_review = appreviews(appid) + + if app_review["total_reviews"] == 0: + return -1, -1 + + positive = app_review["total_positive"] / app_review["total_reviews"] + return positive, app_review["total_reviews"] + + +def eula(appid: AppID) -> str: + r = cache.get(f"https://store.steampowered.com//eula/{appid}_eula_0") + soup = BeautifulSoup(r.text, "html.parser").find(id="eula_content") + if soup is not None: + return soup.text + return "" + + +def search(query: str) -> Optional[str]: + logger.debug("Searching Steam store for %s", query) + r = cache.get("https://store.steampowered.com/search/suggest", params={ + "term": query, + "f": "json", + "cc": "US", + "l": "english" + }) + + # Reverse results to make the first one take precedence over later ones if multiple results have the same name. + # E.g. "Wolfenstein II: The New Colossus" has both international and german version under the same name. + items = {item["name"]: item for item in reversed(r.json)} + + try: + best_match = items[util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90)[0]] + logger.debug("Best match is '%s'", best_match) + type_to_slug = { + "game": "app", + "dlc": "app", + "bundle": "bundle" + } + slug = type_to_slug.get(best_match['type'], best_match['type']) + return f"https://store.steampowered.com/{slug}/{best_match['id']}" + except IndexError: + logger.debug("Unable to find %s in Steam search results", query) + return None + + +def update_info(link: str, release: Release) -> None: + logger.debug("Getting information about game using Steam API") + link_type, appid = re.search("(app|sub|bundle)(?:/)([0-9]+)", link).groups() + + if link_type == "bundle": + logger.debug("Steam link is to bundle: not utilizing API") # Steam has no public API for bundles + return + + # If the link is a package on Steam (e.g. game + dlc), we need to find the base game of the package + if link_type == "sub": + package_details = packagedetails(appid) + + # Set game name to package name (e.g. 'Fallout New Vegas Ultimate' instead of 'Fallout New Vegas') + release.game_name = package_details["name"] + + # Use the "base game" of the package as the basis for further computation. + # We guesstimate the base game as the most popular app (i.e. the one with the most reviews) + package_appids = [app["id"] for app in package_details["apps"]] + package_apps_details = [appdetails(appid) for appid in package_appids] + details = max(package_apps_details, key=lambda app: reviews(app["steam_appid"])[1]) + appid = details["steam_appid"] + + # Otherwise, if the release is a single game on Steam + else: + details = appdetails(appid) + release.game_name = details["name"] + + # Now that we have a single Steam game to represent the release, use it to improve the information + release.score, release.num_reviews = reviews(appid) + + # DLC releases don't always contain the word "dlc" (e.g. 'Fallout New Vegas: Dead Money'), so some DLCs get + # mislabeled as games during offline parsing. We can use Steam's API to get the correct type, but if the release was + # already deemed an update, keep it as such, because an update to a DLC is an update. + if details["type"] == "dlc" and release.type != "Update": + release.type = "DLC" + + # Add highlight if "denuvo" occurs in Steam's DRM notice or potential 3rd-party EULA + if "denuvo" in (details.get("drm_notice", "") + eula(appid)).lower(): + logger.info("'denuvo' found in Steam DRM-notice/EULA; adding 'DENUVO' to highlights") + release.highlights.append("DENUVO") diff --git a/dailyreleases/web.py b/dailyreleases/web.py deleted file mode 100644 index 507e515..0000000 --- a/dailyreleases/web.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging - -import requests - -logger = logging.getLogger(__name__) - - -class Web(object): - def __init__(self, config, cache) -> None: - self.config = config - self.cache = cache - - def search(self, query) -> list: - try: - return self._google_search(query) - except (KeyError, requests.RequestException) as e: - logger.exception(e) - logger.warning("Google search failed (probably rate-limited)") - return [] - - def _google_search(self, query) -> list: - logger.debug("Searching Google for %s", query) - - payload = { - "key": self.config["google"]["key"], - "cx": self.config["google"]["cx"], - "q": query - } - r = self.cache.get("https://www.googleapis.com/customsearch/v1", params=payload) - - return [result["link"] for result in r.json()["items"]] diff --git a/setup.py b/setup.py index ad79f25..23df04c 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import setup, find_packages from codecs import open from os import path -from dailyreleases import __author__, __version__, __licence__ +from .dailyreleases import __author__, __version__, __licence__ here = path.abspath(path.dirname(__file__)) @@ -37,8 +37,6 @@ setup( ] }, install_requires=[ - "requests", - "requests_cache", "praw", "beautifulsoup4" ], diff --git a/tests/test_parse_dirname.py b/tests/test_parse_dirname.py index d7367b4..3c4a801 100644 --- a/tests/test_parse_dirname.py +++ b/tests/test_parse_dirname.py @@ -1,140 +1,160 @@ import unittest -from dailyreleases.main import DailyReleasesBot +from dailyreleases import parsing +from dailyreleases.parsing import ReleaseType, Platform class ParseDirnameTestCase(unittest.TestCase): - def setUp(self): - self.bot = DailyReleasesBot() - def test_single_word_release(self): - p = self.bot.parse_dirname("Aztez-DARKSiDERS") + p = parsing.parse_dirname("Aztez-DARKSiDERS", "nfo_link") - self.assertEqual("Aztez-DARKSiDERS", p["dirname"]) - self.assertEqual("Aztez", p["rls_name"]) - self.assertEqual("Aztez", p["game_name"]) - self.assertEqual("Windows", p["platform"]) - self.assertEqual("Game", p["type"]) - self.assertEqual("DARKSiDERS", p["group"]) - self.assertIn("store.steampowered.com/app/244750", p["store_links"]["Steam"]) - self.assertEqual([], p["tags"]) - self.assertEqual([], p["highlights"]) + self.assertEqual("Aztez-DARKSiDERS", p.dirname) + self.assertEqual("Aztez", p.rls_name) + self.assertEqual("Aztez", p.game_name) + self.assertEqual(Platform.WINDOWS, p.platform) + self.assertEqual(ReleaseType.GAME, p.type) + self.assertEqual("DARKSiDERS", p.group) + self.assertIn("store.steampowered.com/app/244750", p.store_links["Steam"]) + self.assertEqual([], p.tags) + self.assertEqual([], p.highlights) def test_nuked_release(self): # TODO: Actual nuke handling? - p = self.bot.parse_dirname("Battlefield.1-CPY") - self.assertEqual("Battlefield.1-CPY", p["dirname"]) + p = parsing.parse_dirname("Battlefield.1-CPY", "nfo_link") + self.assertEqual("Battlefield.1-CPY", p.dirname) def test_update(self): - p = self.bot.parse_dirname("Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA") - self.assertEqual("Update", p["type"]) + p = parsing.parse_dirname("Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA", "nfo_link") + self.assertEqual(ReleaseType.UPDATE, p.type) + self.assertIn("store.steampowered.com/app/754920", p.store_links["Steam"]) def test_proper_highlight(self): - p = self.bot.parse_dirname("Death.Coming.PROPER-SiMPLEX") - self.assertEqual(["PROPER"], p["highlights"]) + p = parsing.parse_dirname("Death.Coming.PROPER-SiMPLEX", "nfo_link") + self.assertEqual(["PROPER"], p.highlights) + self.assertIn("store.steampowered.com/app/705120", p.store_links["Steam"]) def test_macos_release(self): - p = self.bot.parse_dirname("The_Fall_Part_2_Unbound_MacOS-Razor1911") - self.assertEqual("Mac OSX", p["platform"]) - self.assertEqual("Game", p["type"]) + p = parsing.parse_dirname("The_Fall_Part_2_Unbound_MacOS-Razor1911", "nfo_link") + self.assertEqual(Platform.OSX, p.platform) + self.assertEqual(ReleaseType.GAME, p.type) + self.assertIn("store.steampowered.com/app/510490", p.store_links["Steam"]) + self.assertIn("gog.com/game/the_fall_part_2_unbound", p.store_links["GOG"]) def test_macosx_update(self): - p = self.bot.parse_dirname("Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911") - self.assertEqual("Mac OSX", p["platform"]) - self.assertEqual("Update", p["type"]) + p = parsing.parse_dirname("Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911", "nfo_link") + self.assertEqual(Platform.OSX, p.platform) + self.assertEqual(ReleaseType.UPDATE, p.type) + self.assertIn("store.steampowered.com/app/344240", p.store_links["Steam"]) + self.assertIn("gog.com/game/man_o_war_corsair", p.store_links["GOG"]) def test_linux_release(self): - p = self.bot.parse_dirname("Sphinx_And_The_Cursed_Mummy_Linux-Razor1911") - self.assertEqual("Linux", p["platform"]) - self.assertEqual("Game", p["type"]) + p = parsing.parse_dirname("Sphinx_And_The_Cursed_Mummy_Linux-Razor1911", "nfo_link") + self.assertEqual(Platform.LINUX, p.platform) + self.assertEqual(ReleaseType.GAME, p.type) + self.assertIn("store.steampowered.com/app/606710", p.store_links["Steam"]) + self.assertIn("gog.com/game/sphinx_and_the_cursed_mummy", p.store_links["GOG"]) def test_dlc_explicit(self): - p = self.bot.parse_dirname("Fallout.4.Far.Harbor.DLC-CODEX") - self.assertEqual("DLC", p["type"]) + p = parsing.parse_dirname("Fallout.4.Far.Harbor.DLC-CODEX", "nfo_link") + self.assertIn("store.steampowered.com/app/435881", p.store_links["Steam"]) + self.assertEqual(ReleaseType.DLC, p.type) def test_dlc_implicit(self): - p = self.bot.parse_dirname("Euro.Truck.Simulator.2.Italia-CODEX") - self.assertEqual("DLC", p["type"]) - self.assertIn("store.steampowered.com/app/558244", p["store_links"]["Steam"]) + p = parsing.parse_dirname("Euro.Truck.Simulator.2.Italia-CODEX", "nfo_link") + self.assertEqual(ReleaseType.DLC, p.type) + self.assertIn("store.steampowered.com/app/558244", p.store_links["Steam"]) def test_incl_dlc_update(self): - p = self.bot.parse_dirname("Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX") - self.assertEqual("Update", p["type"]) + p = parsing.parse_dirname("Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX", "nfo_link") + self.assertEqual(ReleaseType.UPDATE, p.type) + self.assertIn("store.steampowered.com/app/612880", p.store_links["Steam"]) def test_incl_dlc_release(self): - p = self.bot.parse_dirname("Mutiny.Incl.DLC-DARKSiDERS") - self.assertEqual("Game", p["type"]) + p = parsing.parse_dirname("Mutiny.Incl.DLC-DARKSiDERS", "nfo_link") + self.assertEqual(ReleaseType.GAME, p.type) def test_score_steam(self): - p1 = self.bot.parse_dirname("BioShock_Infinite-FLT") - p2 = self.bot.parse_dirname("Duke.Nukem.Forever.Complete-PLAZA") - self.assertGreater(p1["score"], p2["score"]) + p1 = parsing.parse_dirname("BioShock_Infinite-FLT", "nfo_link") + self.assertIn("store.steampowered.com/app/8870", p1.store_links["Steam"]) + p2 = parsing.parse_dirname("Duke.Nukem.Forever.Complete-PLAZA", "nfo_link") + self.assertIn("store.steampowered.com/app/57900", p2.store_links["Steam"]) + self.assertGreater(p1.score, p2.score) def test_non_steam(self): - p = self.bot.parse_dirname("Battlefield.1.REPACK-CPY") - self.assertIn("www.origin.com/usa/en-us/store/battlefield/battlefield-1", p["store_links"]["Origin"]) - self.assertEqual(-1, p["score"]) - self.assertEqual(-1, p["num_reviews"]) + p = parsing.parse_dirname("Battlefield.1.REPACK-CPY", "nfo_link") + self.assertIn("www.origin.com/usa/en-us/store/battlefield/battlefield-1", p.store_links["Origin"]) + self.assertEqual(-1, p.score) + self.assertEqual(-1, p.num_reviews) def test_gog_exclusive(self): - p = self.bot.parse_dirname("Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT") - self.assertIn("gog.com/game/dungeons_dragons_dragonshard", p["store_links"]["GOG"]) - self.assertEqual(-1, p["score"]) + # TODO: Actually use GOG API (gog.update_info) + p = parsing.parse_dirname("Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT", "nfo_link") + self.assertIn("gog.com/game/dungeons_dragons_dragonshard", p.store_links["GOG"]) + self.assertEqual(-1, p.score) + + def test_gog_exclusive2(self): + p = parsing.parse_dirname("Diablo.GOG.Classic-KaliMaaShaktiDe", "nfo_link") + self.assertIn("gog.com/game/diablo", p.store_links["GOG"]) def test_score_non_steam(self): - p = self.bot.parse_dirname("Ode.RIP.MULTI12-SiMPLEX") - self.assertEqual(-1, p["score"]) + p = parsing.parse_dirname("Ode.RIP.MULTI12-SiMPLEX", "nfo_link") + self.assertEqual(-1, p.score) def test_tags(self): - p = self.bot.parse_dirname("Teenage.Mutant.Ninja.Turtles.Portal.Power.RIP.MULTI8-SiMPLEX") - self.assertEqual(["RIP", "MULTI8"], p["tags"]) + p = parsing.parse_dirname("The.Curious.Expedition.v1.3.7.1.MULTI.7.RIP-Unleashed", "nfo_link") + self.assertIn("gog.com/game/curious_expedition_the", p.store_links["GOG"]) + self.assertEqual(["MULTI.7", "RIP"], p.tags) def test_skip_software(self): - p = self.bot.parse_dirname("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED") + p = parsing.parse_dirname("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link") self.assertIsNone(p) def test_steam_package(self): - p = self.bot.parse_dirname("Farming.Simulator.17.Platinum.Edition.Update.v1.5.3-BAT") - self.assertEqual("Farming Simulator 17 - Platinum Edition", p["game_name"]) - self.assertEqual("Update", p["type"]) - self.assertIn("store.steampowered.com/sub/202103", p["store_links"]["Steam"]) + p = parsing.parse_dirname("Farming.Simulator.17.Platinum.Edition.Update.v1.5.3-BAT", "nfo_link") + self.assertEqual("Farming Simulator 17 - Platinum Edition", p.game_name) + self.assertEqual(ReleaseType.UPDATE, p.type) + self.assertIn("store.steampowered.com/sub/202103", p.store_links["Steam"]) def test_steam_package_with_dlc_first(self): - p = self.bot.parse_dirname("The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED") - self.assertEqual("The Witcher 3: Wild Hunt - Game of the Year Edition", p["game_name"]) - self.assertEqual("Game", p["type"]) - self.assertIn("store.steampowered.com/sub/124923", p["store_links"]["Steam"]) + p = parsing.parse_dirname("The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED", "nfo_link") + self.assertEqual("The Witcher 3: Wild Hunt - Game of the Year Edition", p.game_name) + self.assertEqual(ReleaseType.GAME, p.type) + self.assertIn("store.steampowered.com/sub/124923", p.store_links["Steam"]) def test_steam_bundle(self): - p = self.bot.parse_dirname("Valve.Complete.Pack-FAKE") - self.assertEqual("Valve.Complete.Pack-FAKE", p["dirname"]) - self.assertEqual("Valve Complete Pack", p["game_name"]) - self.assertEqual("Windows", p["platform"]) - self.assertEqual("Game", p["type"]) - self.assertIn("store.steampowered.com/bundle/232", p["store_links"]["Steam"]) + p = parsing.parse_dirname("Valve.Complete.Pack-FAKE", "nfo_link") + self.assertEqual("Valve.Complete.Pack-FAKE", p.dirname) + self.assertEqual("Valve Complete Pack", p.game_name) + self.assertEqual("Windows", p.platform) + self.assertEqual(ReleaseType.GAME, p.type) + self.assertIn("store.steampowered.com/bundle/232", p.store_links["Steam"]) def test_steam_denuvo(self): # "denuvo" occurs in the Steam EULA - p = self.bot.parse_dirname("Deus.Ex.Mankind.Divided-CPY") - self.assertEqual(["DENUVO"], p["highlights"]) + p = parsing.parse_dirname("Deus.Ex.Mankind.Divided-CPY", "nfo_link") + self.assertEqual(["DENUVO"], p.highlights) # "denuvo" occurs in the Steam DRM notice - p = self.bot.parse_dirname("Yakuza.0-FAKE") - self.assertEqual(["DENUVO"], p["highlights"]) + p = parsing.parse_dirname("Yakuza.0-FAKE", "nfo_link") + self.assertEqual(["DENUVO"], p.highlights) def test_episode_release(self): - p = self.bot.parse_dirname("Life.is.Strange.Before.the.Storm.Episode.3-CODEX") - self.assertEqual("Life is Strange: Before the Storm Episode 3", p["game_name"]) - self.assertEqual("DLC", p["type"]) - self.assertIn("store.steampowered.com/app/704740", p["store_links"]["Steam"]) + p = parsing.parse_dirname("Life.is.Strange.Before.the.Storm.Episode.3-CODEX", "nfo_link") + self.assertEqual("Life is Strange: Before the Storm Episode 3", p.game_name) + self.assertEqual(ReleaseType.DLC, p.type) + self.assertIn("store.steampowered.com/app/704740", p.store_links["Steam"]) def test_season_and_episode_release(self): - p = self.bot.parse_dirname("Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED") - self.assertEqual("Minecraft Story Mode Season Two Episode 5", p["game_name"]) + p = parsing.parse_dirname("Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED", "nfo_link") + self.assertEqual("Minecraft Story Mode Season Two Episode 5", p.game_name) def test_build_is_update(self): - p = self.bot.parse_dirname("DUSK.Episode.1.Build.2.6-SKIDROW") - self.assertEqual("Update", p["type"]) + p = parsing.parse_dirname("DUSK.Episode.1.Build.2.6-SKIDROW", "nfo_link") + self.assertEqual(ReleaseType.UPDATE, p.type) + + def test_prefer_steam_to_microsoft_store(self): + p = parsing.parse_dirname("Forgiveness-PLAZA", "nfo_link") + self.assertIn("store.steampowered.com/app/971120", p.store_links["Steam"]) if __name__ == '__main__': diff --git a/tests/test_parse_releases.py b/tests/test_parse_releases.py index 64f7034..0d33212 100644 --- a/tests/test_parse_releases.py +++ b/tests/test_parse_releases.py @@ -1,44 +1,57 @@ import unittest from datetime import datetime, timedelta -from dailyreleases.main import DailyReleasesBot +from dailyreleases import parsing +from dailyreleases.parsing import Platform, ReleaseType +from dailyreleases.predbs import PredbRelease class ParseReleasesTestCase(unittest.TestCase): - def setUp(self): - self.bot = DailyReleasesBot() + + @classmethod + def setUpClass(cls) -> None: + cls.empty_releases = {platform: {release_type: [] for release_type in ReleaseType} + for platform in Platform} + + def test_dont_skip(self): + already_posted = set() + releases = [ + PredbRelease("Aztez-DARKSiDERS", "nfo_link", datetime.now()) + ] + parsed_releases = parsing.parse_releases(releases, already_posted) + self.assertEqual(parsed_releases[Platform.WINDOWS][ReleaseType.GAME][0].game_name, "Aztez") def test_skip_already_posted(self): already_posted = {"Aztez-DARKSiDERS"} - releases = { - "Aztez-DARKSiDERS": ("nfo_link", datetime.now()) - } - parsed_releases = self.bot.parse_releases(releases, already_posted) - self.assertDictEqual(parsed_releases[0], dict()) + releases = [ + PredbRelease("Aztez-DARKSiDERS", "nfo_link", datetime.now()) + ] + parsed_releases = parsing.parse_releases(releases, already_posted) + self.assertDictEqual(parsed_releases, self.empty_releases) def test_skip_blacklisted_word(self): already_posted = set() - releases = { - "Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED": ("nfo_link", datetime.now()) - } - parsed_releases = self.bot.parse_releases(releases, already_posted) - self.assertDictEqual(parsed_releases[0], dict()) + releases = [ + PredbRelease("Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED", "nfo_link", datetime.now()) + ] + parsed_releases = parsing.parse_releases(releases, already_posted) + self.assertDictEqual(parsed_releases, self.empty_releases) def test_skip_older_than_48hr(self): already_posted = set() - releases = { - "Aztez-DARKSiDERS": ("nfo_link", datetime.now() - timedelta(hours=50)) - } - parsed_releases = self.bot.parse_releases(releases, already_posted) - self.assertDictEqual(parsed_releases[0], dict()) + releases = [ + PredbRelease("Aztez-DARKSiDERS", "nfo_link", datetime.now() - timedelta(hours=50)) + ] + parsed_releases = parsing.parse_releases(releases, already_posted) + self.assertDictEqual(parsed_releases, self.empty_releases) def test_skip_no_data_for_software(self): already_posted = set() - releases = { - "Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED": ("nfo_link", datetime.now()) - } - parsed_releases = self.bot.parse_releases(releases, already_posted) - self.assertDictEqual(parsed_releases[0], dict()) + releases = [ + PredbRelease("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link", datetime.now()) + ] + parsed_releases = parsing.parse_releases(releases, already_posted) + self.assertDictEqual(parsed_releases, self.empty_releases) if __name__ == '__main__':