1
0
Fork 0

Version 0.0.3.

Change design from OOP to more of a "modules" approach.
Remove dependency on "requests" and "requests_cache".
Implement custom cache wrapper for urllib.
Increase code cohesion.
Update and add further tests.
This commit is contained in:
Casper V. Kristensen 2019-03-08 02:38:55 +01:00
parent 635720f618
commit 4023b095a4
Signed by: caspervk
GPG key ID: 289CA03790535054
18 changed files with 935 additions and 739 deletions

View file

@ -1,3 +1,3 @@
__version__ = "0.0.2" __version__ = "0.0.3"
__author__ = "Casper V. Kristensen" __author__ = "Casper V. Kristensen"
__licence__ = "GPLv3" __licence__ = "GPLv3"

103
dailyreleases/cache.py Normal file
View file

@ -0,0 +1,103 @@
import json
import logging
import sqlite3
import urllib.parse
import urllib.request
from datetime import timedelta, datetime
from http.client import HTTPResponse
from typing import Mapping
from urllib.request import Request, urlopen
from .config import DATA_DIR, CONFIG
logger = logging.getLogger(__name__)
class Response:
def __init__(self, response: HTTPResponse = None, bytes: bytes = None) -> None:
if response is not None:
self.bytes = response.read()
else:
self.bytes = bytes
self.text = self.bytes.decode() # TODO: Detect encoding
@property
def json(self):
return json.loads(self.bytes)
connection = sqlite3.connect(DATA_DIR.joinpath("cache.sqlite"))
connection.row_factory = sqlite3.Row # allow accessing rows by index and case-insensitively by name
connection.text_factory = bytes # do not try to decode bytes as utf-8 strings
cache_time = timedelta(seconds=CONFIG["web"].getint("cache_time"))
logger.info("Requests cache time is %s", cache_time)
connection.executescript(
f"""
CREATE TABLE IF NOT EXISTS
requests (id INTEGER PRIMARY KEY,
url TEXT UNIQUE NOT NULL,
response BLOB NOT NULL,
timestamp INTEGER NOT NULL);
DELETE FROM requests
WHERE timestamp < {(datetime.utcnow() - cache_time).timestamp()};
VACUUM;
"""
)
def get(url: str, params: Mapping = None, *args, **kwargs) -> Response:
if params is not None:
url += "?" + urllib.parse.urlencode(params)
request = Request(url, *args, **kwargs)
request.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0")
row = connection.execute(
"""
SELECT response, timestamp
FROM requests
WHERE url = :url;
""", {
"url": url
}
).fetchone()
# Cache miss
if row is None:
response = Response(urlopen(request))
connection.execute(
"""
INSERT INTO requests(url, response, timestamp)
VALUES (:url, :response, :timestamp);
""", {
"url": url,
"response": response.bytes,
"timestamp": datetime.utcnow().timestamp()
}
)
connection.commit()
return response
# Cached and fresh
if datetime.fromtimestamp(row["timestamp"]) > datetime.utcnow() - cache_time:
return Response(bytes=row["response"])
# Cached but stale
response = Response(urlopen(request))
connection.execute(
"""
UPDATE requests
SET response = :response,
timestamp = :timestamp
WHERE url = :url;
""", {
"url": url,
"response": response.bytes,
"timestamp": datetime.utcnow().timestamp()
}
)
connection.commit()
return response

View file

@ -9,7 +9,6 @@ mode = test
[logging] [logging]
level = DEBUG level = DEBUG
file = logs/main.log
backup_count = 10 backup_count = 10

View file

@ -1,7 +1,6 @@
import configparser import configparser
import logging import logging
import logging.config import logging.config
import os
import shutil import shutil
from pathlib import Path from pathlib import Path
@ -18,8 +17,7 @@ def read_config() -> configparser.ConfigParser:
Read and return config file. Copies default config template to data dir if it doesn't already exists. Read and return config file. Copies default config template to data dir if it doesn't already exists.
""" """
if not CONFIG_FILE.exists(): if not CONFIG_FILE.exists():
print("Copying default configuration file..") DATA_DIR.mkdir(exist_ok=True)
os.makedirs(DATA_DIR, exist_ok=True)
shutil.copyfile(DEFAULT_CONFIG_FILE, CONFIG_FILE) shutil.copyfile(DEFAULT_CONFIG_FILE, CONFIG_FILE)
print("Please customize", CONFIG_FILE) print("Please customize", CONFIG_FILE)
@ -31,19 +29,8 @@ def read_config() -> configparser.ConfigParser:
return config return config
def initialize_logging(config: configparser.ConfigParser): CONFIG = read_config()
"""
Set up logging.
"""
log_file = DATA_DIR.joinpath(config["logging"]["file"])
log_level = config["logging"]["level"]
log_backup_count = config["logging"].getint("backup_count")
os.makedirs(log_file.parent, exist_ok=True)
logging.config.dictConfig(logging_config(log_file, log_level, log_backup_count))
logger.info("Logging level is %s", log_level)
logger.info("Logging to %s - backup count is %s", log_file, log_backup_count)
def logging_config(file, level, backup_count) -> dict: def logging_config(file, level, backup_count) -> dict:
return { return {
@ -65,7 +52,7 @@ def logging_config(file, level, backup_count) -> dict:
"class": "logging.handlers.TimedRotatingFileHandler", "class": "logging.handlers.TimedRotatingFileHandler",
"when": "midnight", "when": "midnight",
"backupCount": backup_count, "backupCount": backup_count,
"filename": DATA_DIR.joinpath(file), "filename": file,
"encoding": "utf-8", "encoding": "utf-8",
"formatter": "standard", "formatter": "standard",
"level": level "level": level
@ -81,3 +68,21 @@ def logging_config(file, level, backup_count) -> dict:
"level": "WARNING" "level": "WARNING"
} }
} }
def initialize_logging(config: configparser.ConfigParser = CONFIG) -> None:
"""
Set up logging.
"""
file = DATA_DIR.joinpath("logs/main.log")
level = config["logging"]["level"]
backup_count = config["logging"].getint("backup_count")
file.parent.mkdir(exist_ok=True)
logging.config.dictConfig(logging_config(file, level, backup_count))
logger.info("Logging level is %s", level)
logger.info("Logging to %s - backup count is %s", file, backup_count)
initialize_logging()

View file

@ -1,29 +0,0 @@
import logging
from dailyreleases import util
logger = logging.getLogger(__name__)
class GOG(object):
def __init__(self, cache) -> None:
self.cache = cache
def search(self, query):
logger.debug("Searching GOG for %s", query)
payload = {
"limit": 5,
"search": query
}
products = {p["title"]: p
for p in self.cache.get("https://www.gog.com/games/ajax/filtered", params=payload).json()["products"]
if p["isGame"]}
best_match = util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90)
if not best_match:
logger.debug("Unable to find %s in GOG search results", query)
return
logger.debug("Best match is '%s'", best_match[0])
return "https://gog.com{url}".format(**products[best_match[0]])

View file

@ -1,445 +1,195 @@
import inspect import inspect
import json
import logging import logging
import re import re
import string
import textwrap import textwrap
import time import time
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Set
import prawcore import prawcore
import requests_cache
from dailyreleases import config, __version__, util from . import __version__, util, reddit, predbs, parsing
from dailyreleases.gog import GOG from .config import CONFIG, DATA_DIR
from dailyreleases.predb import Predb from .parsing import ParsedReleases, Release, ReleaseType
from dailyreleases.reddit import Reddit
from dailyreleases.steam import Steam
from dailyreleases.web import Web
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DailyReleasesBot(object): def listen_inbox() -> None:
def __init__(self) -> None: logger.info("Listening on reddit inbox stream")
print(f"Starting Daily Releases Bot v{__version__}") authorized_users = CONFIG["reddit"]["authorized_users"].split(",")
# Load config file and initialize logging based on its contents while True:
self.config = config.read_config()
config.initialize_logging(self.config)
# Setup caching of requests; helps reduce the number of requests if the same game gets multiple releases
cache_time = self.config["web"].getint("cache_time")
self.cache = requests_cache.core.CachedSession(cache_name=str(config.DATA_DIR.joinpath("cache")),
expire_after=cache_time)
logger.info("Requests cache time is %ss", cache_time)
# Initialize sub-modules
self.web = Web(self.config, self.cache)
self.predb = Predb(self.cache)
self.steam = Steam(self.cache)
self.gog = GOG(self.cache)
self.reddit = Reddit(self.config)
def run(self):
mode = self.config["main"]["mode"]
logger.info("Mode is %s", mode)
if mode == "test":
self.generate(post=False)
if mode == "immediately":
self.generate(post=True, pm_recipients=self.config["reddit"]["notify_users"].split(","))
if mode == "reply":
self.listen_inbox()
def listen_inbox(self):
logger.info("Listening on reddit inbox stream")
authorized_users = self.config["reddit"]["authorized_users"].split(",")
while True:
try:
for message in self.reddit.praw.inbox.stream():
if message.author in authorized_users:
self.generate(post=True, pm_recipients=(message.author.name,))
else:
logger.info("Discarding PM from %s: not authorized user", message.author)
message.mark_read() # mark message read last so we can retry after potential fatal errors
except prawcore.PrawcoreException as e:
logger.warning("PrawcoreException: %s", e)
logger.info("Restarting inbox listener..")
except KeyboardInterrupt:
print("Exiting (KeyboardInterrupt)")
break
def find_store_links(self, game_name) -> dict:
links = {}
# Steam
steam_link = self.steam.search(game_name)
if steam_link:
links["Steam"] = steam_link
# GOG
gog_link = self.gog.search(game_name)
if gog_link:
links["GOG"] = f"{gog_link} 'DRM-Free! 👍'" # hover text
if links:
return links
# If none of those worked, try Googling the game
known_stores = {
"store.steampowered.com/(app|sub|bundle)": "Steam", # order doesn't matter
"gog.com/game": "GOG",
"origin.com": "Origin",
"ubi(soft)?.com": "Ubisoft",
"www.microsoft.com/.*p": "Microsoft Store",
"itch.io": "Itch.io",
"bigfishgames.com": "Big Fish Games",
"gamejolt.com": "Game Jolt",
"alawar.com": "Alawar",
"wildtangent.com": "WildTangent Games"
}
# Multiple store links are sometimes returned, but we believe in Google's algorithm and choose the first one
for link in self.web.search(f"{game_name} buy"):
for store_url, store_name in known_stores.items():
if re.search(store_url, link, flags=re.IGNORECASE):
return {store_name: link}
logger.debug("Unable to find store links for %s", game_name)
return {}
def parse_dirname(self, dirname):
logger.info("---")
logger.info("Parsing: %s", dirname)
# Extract group name
rls_name, group = dirname.rsplit("-", 1)
# Find game name by matching until one of the stopwords
game_name, *stopwords = re.split("[._-](update|v[0-9]+|Crack[._-]?fix|mac[._-]?os[._-]?x?|linux|MULTI[0-9]+|RIP"
"|GOG|PROPER|REPACK|Multilanguage|incl|Standalone|x(?:86|64)"
"|(?:86|64)[._-]?bit|German|CZECH|RUSSIAN|KOREAN|ITALIAN|SWEDISH|DANISH|French"
"|Slovak|DIR[._-]?FIX|build[._-]?[0-9]+|READNFO|Hotfix|DLC[._-]?Unlocker"
"|iNTERNAL|Steam[._-]?Edition)",
rls_name, flags=re.IGNORECASE)
# Prettify game name by substituting word delimiters with spaces and capitalizing each word. Delimiters
# separated by fewer than two letters are not substituted, to allow titles like "R.O.V.E.R."
game_name = string.capwords(re.sub("[._-]([a-zA-Z]{2,}|[0-9]+)", " \g<1>", game_name))
# Some stopwords are interesting enough to add next to the game in parentheses - we call these tags
tags = [stopword
for stopword in stopwords
if re.match("Crack[._-]?fix|MULTI[0-9]+|RIP|REPACK|x(?:86|64)|(?:86|64)[._-]?bit|German|CZECH|RUSSIAN"
"|KOREAN|ITALIAN|SWEDISH|DANISH|French|Slovak|Hotfix|DIR[._-]?FIX",
stopword, flags=re.IGNORECASE)]
# Some stopwords are even more interesting and deserve to be highlighted (e.g. '- PROPER' next to the name)
highlights = [highlight
for highlight in stopwords
if re.match("PROPER|READNFO", highlight, flags=re.IGNORECASE)]
# Find platform
if re.search("mac[._-]?os[._-]?x?", rls_name, flags=re.IGNORECASE):
platform = "Mac OSX"
elif re.search("linux", rls_name, flags=re.IGNORECASE):
platform = "Linux"
else:
platform = "Windows"
# Find type (game/dlc/update)
# Order of the if-statements is important: Update trumps DLC because an update to a DLC is an update, not a DLC!
rls_type = "Game"
if re.search("(?<!incl[._-])dlc", # 'Incl.DLC' isn't a DLC-release
rls_name, flags=re.IGNORECASE):
rls_type = "DLC"
if re.search("update|v[0-9]|addon|Crack[._-]?fix|DIR[._-]?FIX|build[._-]?[0-9]+",
rls_name, flags=re.IGNORECASE):
rls_type = "Update"
logger.info("Offline: %s %s : %s - %s", platform, rls_type, game_name, group)
logger.info("Tags: %s. Highlights: %s", tags, highlights)
# Find store links
store_links = self.find_store_links(game_name)
# No store link? Probably software and not a game
if not store_links:
logger.info("Skipping %s: no store link (probably software)", dirname)
return
# Game score and number of reviews is -1 by default; it is updated if the game exists on Steam
score = -1
num_reviews = -1
# If one of the store links we found is to Steam, use their API to get (better) information about the game.
# Note: Doesn't apply to Steam bundles, as Steam has no public API for those.
if "Steam" in store_links:
logger.debug("Getting information about game using Steam API")
steam_type, steam_appid = re.search("(app|sub|bundle)(?:/)([0-9]+)", store_links["Steam"]).groups()
if steam_type == "bundle":
logger.debug("Steam link is to bundle: not utilizing API")
else:
# If the release is a package on Steam (e.g. game + dlc), we need to find the base game of the package
if steam_type == "sub":
steam_packagedetails = self.steam.packagedetails(steam_appid)
# Set game name to package name (e.g. 'Fallout New Vegas Ultimate' instead of 'Fallout New Vegas')
game_name = steam_packagedetails["name"]
# Find "base game" of the package; the most popular app (i.e. the one with the most reviews)
steam_package_appids = [str(app["id"]) for app in steam_packagedetails["apps"]]
steam_package_apps_appdetails = [self.steam.appdetails(appid) for appid in steam_package_appids]
steam_package_basegame_appdetails = max(steam_package_apps_appdetails,
key=lambda app: self.steam.reviews(app["steam_appid"]).num)
# Use the base game as the basis for further computation
steam_appdetails = steam_package_basegame_appdetails
steam_appid = steam_package_basegame_appdetails["steam_appid"]
# Otherwise, if the release is a single game on Steam
else:
steam_appdetails = self.steam.appdetails(steam_appid)
game_name = steam_appdetails["name"]
# Now that we have a single Steam game to represent the release, use it to improve the information
score, num_reviews = self.steam.reviews(steam_appid)
# DLC releases don't always contain the word "dlc" (e.g. 'Fallout New Vegas: Dead Money'), so some DLCs
# get mislabeled as games during offline parsing. We can use Steam's API to get the correct type, but if
# the release was already deemed an update, keep it as such, because an update to a DLC is an update.
if steam_appdetails["type"] == "dlc" and rls_type != "Update":
rls_type = "DLC"
# Add highlight if "denuvo" occurs in Steam's DRM notice or potential 3rd-party EULA
if "denuvo" in (steam_appdetails.get("drm_notice", "") + self.steam.eula(steam_appid)).lower():
logger.info("'denuvo' found in Steam DRM-notice/EULA; adding 'DENUVO' to highlights")
highlights.append("DENUVO")
release = {
"dirname": dirname,
"rls_name": rls_name, # dirname without group
"group": group,
"game_name": game_name,
"type": rls_type,
"platform": platform,
"store_links": store_links,
"score": score,
"num_reviews": num_reviews,
"tags": tags,
"highlights": highlights
}
logger.info("Final : %s %s : %s - %s : %s", platform, rls_type, game_name, group, json.dumps(release))
return release
def parse_releases(self, releases, already_posted):
parsed_releases = defaultdict(lambda: defaultdict(list))
failed = set()
# Remove old releases from already_posted to save space and memory
already_posted.intersection_update(releases)
for dirname, (nfo_link, timestamp) in releases.items():
# Skip release if already posted in a previous daily releases post
if dirname in already_posted:
logger.info("Skipping %s: dirname in already posted", dirname)
continue
# Skip release if dirname contains any of the blacklisted words (we only want pc games!)
if re.search("Keygen|Keymaker|Lynda|3ds[._-]?Max|For[._-]Maya|Fedora|openSUSE|Ubuntu|Debian|jQuery|CSS"
"|Cinema4D|3DS|[._-]Wii|[._-]WiiU|ASP[._-]NET|[._-]PSP|[._-]NSW|Windows[._-]Server|OREILLY"
"|TUTORIAL|1080p|720p|x264|eBook|PS4|Wintellectnow",
dirname, flags=re.IGNORECASE):
logger.info("Skipping %s: contains blacklisted word", dirname)
continue
# Skip release if it is older than 48 hours
if timestamp < datetime.now() - timedelta(hours=48):
logger.info("Skipping %s: older than 48 hours (but not in already_posted!)", dirname)
continue
# Parse dirname
try:
parsed_dirname = self.parse_dirname(dirname)
except Exception as e:
failed.add(dirname)
logger.exception(e)
continue
# Skip if there is no data about the release (e.g. if it is deemed a non-game)
if not parsed_dirname:
continue
# Add release to dict of parsed releases by platform and type
release = {**parsed_dirname, "nfo_link": nfo_link}
parsed_releases[release["platform"]][release["type"]].append(release)
already_posted.add(dirname)
logger.debug("Parsed releases: %s", json.dumps(parsed_releases))
logger.debug("Failed releases: %s", ", ".join(failed))
return parsed_releases, failed
def generate_post(self, parsed_releases):
post = []
for platform_name, platform_releases in sorted(parsed_releases.items(),
key=lambda n: ("Windows", "Mac OSX", "Linux").index(n[0])):
# Skip platform if there are no releases for it
if not platform_releases:
continue
post.append(f"# {platform_name}")
for type_name, type_releases in sorted(platform_releases.items(),
key=lambda n: ("Game", "Update", "DLC").index(n[0])):
# Skip release type if there are no releases for it
if not type_releases:
continue
# Releases in the tables are grouped by release group, and the groups are ordered according to the most
# popular game within the group. Games are sorted by popularity internally in the groups as well.
# The popularity of a game is defined by the number of reviews it has on Steam. The popularity of the
# release itself extends this definition, but ranks RIPs lower than non-RIPS.
def popularity(rls):
is_rip = "RIP" in [tag.upper() for tag in rls["tags"]]
return rls["num_reviews"], not is_rip
group_order = defaultdict(lambda: (-1, False))
for rls in type_releases:
group = rls["group"]
group_order[group] = max(group_order[group], popularity(rls))
sorted_releases = sorted(type_releases,
key=lambda r: (group_order[r["group"]],
r["group"], # ensure grouping if two groups share group_order
popularity(r)),
reverse=True)
# The rows in tables containing updates will use the full rls_name as the name, while tables containing
# game and DLC releases will show tags and highlights, as well as the actual stylized game_name.
def row(rls):
if type_name == "Update":
name = "[{}]({})".format(rls["rls_name"], rls["nfo_link"])
else:
tags = " ({})".format(" ".join(rls["tags"])) if rls["tags"] else ""
highlights = " **- {}**".format(", ".join(rls["highlights"])) if rls["highlights"] else ""
name = "[{}{}]({}){}".format(util.markdown_escape(rls["game_name"]), tags, rls["nfo_link"],
highlights)
if rls["score"] == -1:
reviews = "-"
else:
num_reviews_humanized = util.humanize(rls["num_reviews"], precision=1, prefix="dec", suffix="")
reviews = "{:.0%} ({})".format(rls["score"], num_reviews_humanized)
stores = ", ".join(f"[{name}]({link})" for name, link in rls["store_links"].items())
return name, rls["group"], stores, reviews
post.append(f"| {type_name} | Group | Store | Score (Reviews) |")
post.append("|:-|:-|:-|:-|")
post.extend("| {} | {} | {} | {} |".format(*row(rls)) for rls in sorted_releases)
post.append("")
post.append("&nbsp;")
post.append("")
post.append("")
post.append("")
if not post:
logger.warning("Post is empty!")
post.append("No releases today! :o")
# Add link to the previous release thread
previous_post = self.reddit.get_previous_daily_post(self.config["reddit"]["posts_subreddit"])
previous_post_date = re.search("daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE).group(1)
post.append("# [<< {}]({})".format(previous_post_date, previous_post.url))
# Add epilogue
try: try:
with config.DATA_DIR.joinpath("epilogue.txt").open() as file: for message in reddit.praw.inbox.stream():
post.extend(line.rstrip() for line in file.readlines()) if message.author in authorized_users:
except FileNotFoundError: generate(post=True, pm_recipients=(message.author.name,))
logger.info("epilogue.txt not found") else:
logger.info("Discarding PM from %s: not authorized user", message.author)
message.mark_read() # mark message read last so we can retry after potential fatal errors
except prawcore.PrawcoreException as e:
logger.warning("PrawcoreException: %s", e)
logger.info("Restarting inbox listener..")
except KeyboardInterrupt:
print("Exiting (KeyboardInterrupt)")
break
# Convert post list to string
post_str = "\n".join(post)
logger.debug("Generated post:\n%s", post_str) def generate_post(parsed_releases: ParsedReleases) -> str:
return post_str post = []
for platform, platform_releases in parsed_releases.items():
if sum(len(pr) for pr in platform_releases.values()) == 0:
continue
def generate(self, post=False, pm_recipients=None): post.append(f"# {platform}")
logger.info("-------------------------------------------------------------------------------------------------") for release_type, releases in platform_releases.items():
start_time = time.time() if not releases:
continue
already_posted = self.load_already_posted() # Releases in the tables are grouped by release group, and the groups are ordered according to the most
# popular game within the group. Games are sorted by popularity internally in the groups as well.
def popularity(release: Release):
# The popularity of a game is defined by the number of reviews it has on Steam, however, we rank RIPs
# lower than non-RIPs so the same game released as both will sort the non-RIP first.
is_rip = "RIP" in [tag.upper() for tag in release.tags]
return release.num_reviews, not is_rip
releases = self.predb.get_releases() group_order = defaultdict(lambda: (-1, False))
parsed_releases, failed_dirnames = self.parse_releases(releases, already_posted) for release in releases:
group_order[release.group] = max(group_order[release.group], popularity(release))
# The date of the post changes at midday instead of midnight to allow calling script after 00:00 def order(release: Release):
title = "Daily Releases ({})".format((datetime.today() - timedelta(hours=12)).strftime("%B %-d, %Y")) return (group_order[release.group],
release.group, # ensure grouping if two groups share group_order
popularity(release))
generated_post = self.generate_post(parsed_releases) def row(release: Release):
generated_post_src = textwrap.indent(generated_post, " ") # The rows in the table containing updates will use the full rls_name as the name, while tables
# containing game and DLC releases will show tags and highlights, as well as the stylized game_name.
if release_type == ReleaseType.UPDATE:
name = f"[{release.rls_name}]({release.nfo_link})"
else:
tags = " ({})".format(" ".join(release.tags)) if release.tags else ""
highlights = " **- {}**".format(", ".join(release.highlights)) if release.highlights else ""
name = "[{}{}]({}){}".format(util.markdown_escape(release.game_name),
tags,
release.nfo_link,
highlights)
if post: if release.score == -1:
# Post to bot's own subreddit reviews = "-"
bot_subreddit = self.config["reddit"]["bot_subreddit"] else:
reddit_src_post = self.reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit) num_reviews_humanized = util.humanize(release.num_reviews, precision=1, prefix="dec", suffix="")
reddit_post = self.reddit.submit_post(title, generated_post, bot_subreddit) reviews = f"{release.score:.0%} ({num_reviews_humanized})"
# Manually approve posts since reddit seem to think posts with many links are spam stores = ", ".join(f"[{name}]({link})" for name, link in release.store_links.items())
reddit_src_post.mod.approve()
reddit_post.mod.approve()
self.save_already_posted(already_posted) return name, release.group, stores, reviews
post.append(f"| {release_type} | Group | Store | Score (Reviews) |")
post.append("|:-|:-|:-|:-|")
post.extend("| {} | {} | {} | {} |".format(*row(rls)) for rls in sorted(releases, key=order, reverse=True))
post.append("")
post.append("&nbsp;")
post.append("")
post.append("")
post.append("")
if not post:
logger.warning("Post is empty!")
post.append("No releases today! :o")
# Add link to the previous release thread
previous_post = reddit.get_previous_daily_post(CONFIG["reddit"]["posts_subreddit"])
previous_post_date = re.search("daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE).group(1)
post.append(f"# [<< {previous_post_date}]({previous_post.url})")
# Add epilogue
try:
with DATA_DIR.joinpath("epilogue.txt").open() as file:
post.extend(line.rstrip() for line in file.readlines())
except FileNotFoundError:
logger.info("No epilogue.txt")
# Convert post list to string
post_str = "\n".join(post)
logger.debug("Generated post:\n%s", post_str)
return post_str
def generate(post=False, pm_recipients=None) -> None:
logger.info("-------------------------------------------------------------------------------------------------")
start_time = time.time()
already_posted = load_already_posted()
releases = predbs.get_releases()
# Remove old releases from already_posted to save space
already_posted.intersection_update(releases)
parsed_releases = parsing.parse_releases(releases, already_posted)
# The date of the post changes at midday instead of midnight to allow calling script after 00:00
title = f"Daily Releases ({(datetime.today() - timedelta(hours=12)).strftime('%B %-d, %Y')})"
generated_post = generate_post(parsed_releases)
generated_post_src = textwrap.indent(generated_post, " ")
if post:
# Post to bot's own subreddit
bot_subreddit = CONFIG["reddit"]["bot_subreddit"]
reddit_src_post = reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit)
reddit_post = reddit.submit_post(title, generated_post, bot_subreddit)
# Manually approve posts since reddit seem to think posts with many links are spam
reddit_src_post.mod.approve()
reddit_post.mod.approve()
save_already_posted(already_posted)
if pm_recipients is not None: if pm_recipients is not None:
msg = inspect.cleandoc( msg = inspect.cleandoc(
f""" f"""
[Preview]({reddit_post.url}) [Preview]({reddit_post.url})
[Source]({reddit_src_post.url}) [Source]({reddit_src_post.url})
Failed: {", ".join(failed_dirnames)}
""" """
) )
for recipient in pm_recipients: for recipient in pm_recipients:
self.reddit.send_pm(recipient, title, msg) reddit.send_pm(recipient, title, msg)
logger.info("Execution took %s seconds", int(time.time() - start_time)) logger.info("Execution took %s seconds", int(time.time() - start_time))
logger.info("-------------------------------------------------------------------------------------------------") logger.info("-------------------------------------------------------------------------------------------------")
# Clean requests cache after each successful generation so it doesn't grow indefinitely
self.cache.remove_expired_responses()
def load_already_posted(self):
try:
with config.DATA_DIR.joinpath("already_posted").open() as file:
return set(line.rstrip() for line in file.readlines())
except FileNotFoundError:
return set()
def save_already_posted(self, already_posted):
logger.info("Saving already posted to file")
with config.DATA_DIR.joinpath("already_posted").open("w") as file:
for dirname in already_posted:
file.write("{}\n".format(dirname))
def main(): def load_already_posted() -> Set[str]:
bot = DailyReleasesBot() try:
bot.run() with DATA_DIR.joinpath("already_posted").open() as file:
return {line.rstrip() for line in file.readlines()}
except FileNotFoundError:
return set()
def save_already_posted(already_posted) -> None:
logger.info("Saving already posted to file")
with DATA_DIR.joinpath("already_posted").open("w") as file:
file.writelines(already_posted)
def main() -> None:
print(f"Starting Daily Releases Bot v{__version__}")
mode = CONFIG["main"]["mode"]
logger.info("Mode is %s", mode)
if mode == "test":
generate(post=False)
if mode == "immediately":
generate(post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(","))
if mode == "reply":
listen_inbox()
if __name__ == '__main__': if __name__ == '__main__':

227
dailyreleases/parsing.py Normal file
View file

@ -0,0 +1,227 @@
import logging
import re
import string
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, List, Set, Dict, Iterable
from . import stores
from .predbs import PredbRelease
from .stores import steam
logger = logging.getLogger(__name__)
class ReleaseType(str, Enum):
GAME = "Game"
UPDATE = "Update"
DLC = "DLC"
def __str__(self) -> str:
return self.value
class Platform(str, Enum):
WINDOWS = "Windows"
OSX = "Mac OSX"
LINUX = "Linux"
def __str__(self) -> str:
return self.value
@dataclass
class Release:
dirname: str
rls_name: str # dirname without group
group: str
game_name: str
type: ReleaseType
platform: Platform
nfo_link: str
store_links: Dict[str, str] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
highlights: List[str] = field(default_factory=list)
score: int = -1 # score and number of reviews is -1 by default; it is updated if the game exists on Steam
num_reviews: int = -1
STOPWORDS = (
"update",
"v[0-9]+",
"build[._-]?[0-9]+",
"iNTERNAL",
"incl",
"Standalone",
"Multilanguage",
"DLC",
"DLC[._-]?Unlocker",
"Steam[._-]?Edition",
"GOG",
"mac[._-]?os[._-]?x?",
"linux",
)
TAGS = (
"Hotfix",
"Crack[._-]?fix",
"Dir[._-]?fix",
"MULTI[._-]?[0-9]+",
"x(?:86|64)",
"(?:86|64)[._-]?bit",
"RIP",
"REPACK",
"German",
"Czech",
"Russian",
"Korean",
"Italian",
"Swedish",
"Danish",
"French",
"Slovak",
)
HIGHLIGHTS = (
"PROPER",
"READNFO",
)
BLACKLISTED = (
"Keygen",
"Keymaker",
"[._-]3DS",
"[._-]NSW",
"[._-]PS4",
"[._-]PSP",
"[._-]Wii",
"[._-]WiiU",
"x264",
"720p",
"1080p",
"eBook",
"TUTORIAL",
"Debian",
"Ubuntu",
"Fedora",
"openSUSE",
"jQuery",
"CSS"
"ASP[._-]NET",
"Windows[._-]Server",
"Lynda",
"OREILLY"
"Wintellectnow",
"3ds[._-]?Max",
"For[._-]Maya",
"Cinema4D",
)
def parse_dirname(dirname: str, nfo_link: str) -> Optional[Release]:
logger.info("---")
logger.info("Parsing: %s", dirname)
# Extract group name
rls_name, group = dirname.rsplit("-", maxsplit=1)
# Find game name by matching until one of the stopwords
game_name, *stopwords = re.split("[._-]({})".format("|".join(STOPWORDS + TAGS + HIGHLIGHTS)),
rls_name, flags=re.IGNORECASE)
# Prettify game name by substituting word delimiters with spaces and capitalizing each word.
game_name = string.capwords(re.sub("[_-]", " ", game_name))
# Dots separated by fewer than two letters are not substituted to allow titles like "R.O.V.E.R."
game_name = string.capwords(re.sub("[.]([a-zA-Z]{2,}|[0-9]+)", " \g<1>", game_name))
# Some stopwords distinguishes two, otherwise identical, releases (e.g. x86/x64) - we call these tags
tags = [stopword
for stopword in stopwords
if re.match("|".join(TAGS), stopword, flags=re.IGNORECASE)]
# Some stopwords signify an important piece of information and deserve to be highlighted (e.g. PROPER)
highlights = [stopword
for stopword in stopwords
if re.match("|".join(HIGHLIGHTS), stopword, flags=re.IGNORECASE)]
# Find platform
if re.search("mac[._-]?os[._-]?x?", rls_name, flags=re.IGNORECASE):
platform = Platform.OSX
elif re.search("linux", rls_name, flags=re.IGNORECASE):
platform = Platform.LINUX
else:
platform = Platform.WINDOWS
# Find release type (Game/DLC/Update)
# Order of the if-statements is important: Update trumps DLC because an update to a DLC is an update, not a DLC!
if re.search("update|v[0-9]|addon|Crack[._-]?fix|DIR[._-]?FIX|build[._-]?[0-9]+", rls_name, flags=re.IGNORECASE):
rls_type = ReleaseType.UPDATE
elif re.search("(?<!incl[._-])dlc", rls_name, flags=re.IGNORECASE): # 'Incl.DLC' isn't a DLC-release
rls_type = ReleaseType.DLC
else:
rls_type = ReleaseType.GAME
logger.info("Offline: %s %s : %s - %s", platform, rls_type, game_name, group)
logger.info("Tags: %s. Highlights: %s", tags, highlights)
# Find store links
store_links = stores.find_store_links(game_name)
# No store link? Probably software and not a game
if not store_links:
logger.info("Skipping %s: no store link (probably software)", dirname)
return None
release = Release(
dirname=dirname,
rls_name=rls_name,
group=group,
game_name=game_name,
type=rls_type,
platform=platform,
nfo_link=nfo_link,
store_links=store_links,
tags=tags,
highlights=highlights
)
# If one of the store links we found is to Steam, use their API to get (better) information about the game.
if "Steam" in store_links:
steam.update_info(store_links["Steam"], release)
logger.info("Final : %s %s : %s - %s : %s", release.platform, release.type, release.game_name, release.group,
release)
return release
ParsedReleases = Dict[Platform, Dict[ReleaseType, List[Release]]]
def parse_releases(releases: Iterable[PredbRelease], already_posted: Set[str]) -> ParsedReleases:
parsed_releases = {platform: {release_type: [] for release_type in ReleaseType}
for platform in Platform} # {Windows: {Game: [..], DLC: [..], ..}, Linux: ...}
for release in releases:
if release.dirname in already_posted:
logger.info("Skipping %s: dirname in already posted", release.dirname)
continue
if re.search("|".join(BLACKLISTED), release.dirname, flags=re.IGNORECASE):
logger.info("Skipping %s: contains blacklisted word", release.dirname)
continue
if release.timestamp < datetime.now() - timedelta(hours=48):
logger.info("Skipping %s: older than 48 hours (but not in already_posted!?)", release.dirname)
continue
release = parse_dirname(release.dirname, release.nfo_link)
if not release:
continue # skip if there is no data about the release (e.g. if it is deemed a non-game by parse_dirname)
# Add release to dict of parsed releases by platform and type
parsed_releases[release.platform][release.type].append(release)
already_posted.add(release.dirname)
logger.debug("Parsed releases: %s", parsed_releases)
return parsed_releases

View file

@ -1,63 +0,0 @@
import logging
from datetime import datetime
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
class Predb(object):
def __init__(self, cache) -> None:
self.cache = cache
def get_releases(self):
logger.info("Getting releases from predbs")
releases = {}
for db_releases in (self._get_predbme, self._get_xrel): # in reverse order of preference
try:
releases.update(db_releases())
except requests.exceptions.ConnectionError as e:
logger.error(e)
logger.warning("Connection to predb failed, skipping..")
return releases
def _get_xrel(self, categories=("CRACKED", "UPDATE"), num_pages=2):
logger.debug("Getting releases from xrel.to")
def get_releases_in_category(category, page):
payload = {
"category_name": category,
"ext_info_type": "game",
"per_page": 100,
"page": page
}
r = self.cache.get("https://api.xrel.to/v2/release/browse_category.json", params=payload)
return r.json()["list"]
return {rls["dirname"]: (rls["link_href"], datetime.fromtimestamp(rls["time"]))
for category in categories
for page in range(1, num_pages)
for rls in get_releases_in_category(category, page)}
def _get_srrdb(self, num_pages=3):
logger.debug("Getting releases from srrdb.com")
return {rls["release"]: ("https://www.srrdb.com/release/details/{}".format(rls['release']),
datetime.strptime(rls["date"], "%Y-%m-%d %H:%M:%S"))
for p in range(1, num_pages)
for rls in
self.cache.get(f"https://www.srrdb.com/api/search/category:pc/order:date-desc/{p}").json()["results"]}
def _get_predbme(self):
logger.debug("Getting releases from predb.me")
r = self.cache.get("https://predb.me/?cats=games-pc&rss=1")
soup = BeautifulSoup(r.text, "html.parser").find_all("item")
# Predb.me doesn't show timestamps in the RSS-feed, but the feed is so short it only shows ~72 hours worth of
# releases anyway, so we just set timestamp to now.
return {item.find("title").text: (item.find("guid").text, datetime.utcnow())
for item in soup}

64
dailyreleases/predbs.py Normal file
View file

@ -0,0 +1,64 @@
import logging
from datetime import datetime
from typing import NamedTuple, List
from urllib.error import HTTPError
from bs4 import BeautifulSoup
from . import cache
logger = logging.getLogger(__name__)
class PredbRelease(NamedTuple):
dirname: str
nfo_link: str
timestamp: datetime
def get_releases() -> List[PredbRelease]:
logger.info("Getting releases from predbs")
releases = {}
for db_releases in (get_predbme, get_xrel): # in reverse order of preference
try:
releases.update((r.dirname, r) for r in db_releases()) # override duplicate dirnames in later iterations
except HTTPError as e:
logger.error(e)
logger.warning("Connection to predb failed, skipping..")
return list(releases.values())
def get_xrel(categories=("CRACKED", "UPDATE"), num_pages=2) -> List[PredbRelease]:
logger.debug("Getting releases from xrel.to")
def get_releases_in_category(category, page):
r = cache.get("https://api.xrel.to/v2/release/browse_category.json", params={
"category_name": category,
"ext_info_type": "game",
"per_page": 100,
"page": page
})
return r.json["list"]
return [PredbRelease(rls["dirname"],
rls["link_href"],
datetime.fromtimestamp(rls["time"]))
for category in categories
for page in range(1, num_pages)
for rls in get_releases_in_category(category, page)]
def get_predbme() -> List[PredbRelease]:
logger.debug("Getting releases from predb.me")
rss = cache.get("https://predb.me/?cats=games-pc&rss=1")
soup = BeautifulSoup(rss.text, "html.parser").find_all("item")
# Predb.me doesn't show timestamps in the RSS-feed, but the feed is so short it only shows ~72 hours worth of
# releases anyway, so we just set timestamp to now.
return [PredbRelease(item.find("title").text,
item.find("guid").text,
datetime.utcnow())
for item in soup]

View file

@ -1,25 +1,26 @@
import logging import logging
import praw import praw
import praw.models.reddit.submission from praw.models import Submission
from .config import CONFIG
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Reddit(object): praw = praw.Reddit(**CONFIG["reddit"])
def __init__(self, config) -> None:
self.config = config
logger.info("Logging in to reddit")
self.praw = praw.Reddit(**self.config["reddit"])
def send_pm(self, recipient, title, text): def send_pm(recipient, title, text) -> None:
logger.info("Sending PM to u/%s", recipient) logger.info("Sending PM to u/%s", recipient)
return self.praw.redditor(recipient).message(title, text) return praw.redditor(recipient).message(title, text)
def submit_post(self, title, text, subreddit):
logger.info("Submitting post to r/%s", subreddit)
return self.praw.subreddit(subreddit).submit(title, text)
def get_previous_daily_post(self, subreddit) -> praw.models.reddit.submission.Submission: def submit_post(title, text, subreddit) -> Submission:
return next(self.praw.subreddit(subreddit).search("daily release", sort="new", time_filter="week")) logger.info("Submitting post to r/%s", subreddit)
return praw.subreddit(subreddit).submit(title, text)
def get_previous_daily_post(subreddit) -> Submission:
logger.info("Getting previous daily post from r/%s", subreddit)
return next(praw.subreddit(subreddit).search("daily release", sort="new", time_filter="week"))

View file

@ -1,75 +0,0 @@
import logging
from collections import namedtuple
from bs4 import BeautifulSoup
from dailyreleases import util
logger = logging.getLogger(__name__)
class Steam(object):
def __init__(self, cache) -> None:
self.cache = cache
def appdetails(self, appid):
payload = {
"appids": appid
}
r = self.cache.get("https://store.steampowered.com/api/appdetails", params=payload)
return r.json()[appid]["data"]
def packagedetails(self, appid):
payload = {
"packageids": appid
}
r = self.cache.get("https://store.steampowered.com/api/packagedetails", params=payload)
return r.json()[appid]["data"]
def appreviews(self, appid):
payload = {
"start_date": -1,
"end_date": -1,
"filter": "summary",
"language": "all",
"purchase_type": "all",
"json": 1
}
r = self.cache.get(f"https://store.steampowered.com/appreviews/{appid}", params=payload)
return r.json()["query_summary"]
def reviews(self, appid):
app_review = self.appreviews(appid)
Reviews = namedtuple("Reviews", ("score", "num"))
if app_review["total_reviews"] == 0:
return Reviews(-1, -1)
positive = app_review["total_positive"] / app_review["total_reviews"]
return Reviews(positive, app_review["total_reviews"])
def eula(self, appid):
r = self.cache.get(f"https://store.steampowered.com//eula/{appid}_eula_0")
soup = BeautifulSoup(r.text, "html.parser").find(id="eula_content")
if soup is not None:
return soup.text
return ""
def search(self, query):
logger.debug("Searching Steam store for %s", query)
payload = {
"term": query
}
# Reverse results to make the first one take precedence over later ones if multiple results have the same name.
# E.g. "Wolfenstein II: The New Colossus" has both international and german version under the same name.
items = {i["name"]: i for i in reversed(self.cache.get("https://store.steampowered.com/api/storesearch",
params=payload).json()["items"])}
best_match = util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90)
if not best_match:
logger.debug("Unable to find %s in Steam search results", query)
return
logger.debug("Best match is '%s'", best_match[0])
return "https://store.steampowered.com/{type}/{id}".format(**items[best_match[0]])

View file

@ -0,0 +1,59 @@
import logging
import re
from typing import Dict, List
from urllib.error import HTTPError
from .. import cache
from ..config import CONFIG
from ..stores import steam, gog
logger = logging.getLogger(__name__)
def web_search(query: str) -> List[str]:
logger.debug("Searching Google for %s", query)
try:
r = cache.get("https://www.googleapis.com/customsearch/v1", params={
"key": CONFIG["google"]["key"],
"cx": CONFIG["google"]["cx"],
"q": query
})
return [result["link"] for result in r.json["items"]]
except (KeyError, HTTPError) as e:
logger.exception(e)
logger.warning("Google search failed (probably rate-limited)")
return []
def find_store_links(game_name: str) -> Dict[str, str]:
links = {}
for store, name in ((steam, "Steam"), (gog, "GOG")):
link = store.search(game_name)
if link is not None:
links[name] = link
if links:
return links
# If none of those worked, try Googling the game
known_stores = {
"store.steampowered.com/(app|sub|bundle)": "Steam", # order doesn't matter
"gog.com/game": "GOG",
"origin.com": "Origin",
"ubi(soft)?.com": "Ubisoft",
"www.microsoft.com/.*p": "Microsoft Store",
"itch.io": "Itch.io",
"bigfishgames.com/games": "Big Fish Games",
"gamejolt.com": "Game Jolt",
"alawar.com": "Alawar",
"wildtangent.com": "WildTangent Games"
}
# Multiple store links are sometimes returned, but we believe in Google's algorithm and choose the first one
for link in web_search(f"{game_name} buy"):
for store_url, store_name in known_stores.items():
if re.search(store_url, link, flags=re.IGNORECASE):
return {store_name: link}
logger.debug("Unable to find store links for %s", game_name)
return {}

View file

@ -0,0 +1,28 @@
from __future__ import annotations
import logging
from typing import Optional
from .. import util, cache
logger = logging.getLogger(__name__)
def search(query: str) -> Optional[str]:
logger.debug("Searching GOG for %s", query)
r = cache.get("https://www.gog.com/games/ajax/filtered", params={
"search": query,
"mediaType": "game",
"limit": 5
})
products = {p["title"]: p
for p in r.json["products"]
if p["isGame"]}
try:
best_match = products[util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90)[0]]
logger.debug("Best match is '%s'", best_match)
return "https://gog.com{url}".format(**best_match)
except IndexError:
logger.debug("Unable to find %s in GOG search results", query)
return None

View file

@ -0,0 +1,127 @@
from __future__ import annotations
import logging
import re
from typing import TypeVar, Optional, Tuple
from bs4 import BeautifulSoup
from .. import util, cache
logger = logging.getLogger(__name__)
AppID = TypeVar("AppID", int, str)
def appdetails(appid: AppID) -> dict:
r = cache.get("https://store.steampowered.com/api/appdetails", params={
"appids": appid
})
return r.json[str(appid)]["data"]
def packagedetails(appid: AppID) -> dict:
r = cache.get("https://store.steampowered.com/api/packagedetails", params={
"packageids": appid
})
return r.json[str(appid)]["data"]
def appreviews(appid: AppID) -> dict:
r = cache.get(f"https://store.steampowered.com/appreviews/{appid}", params={
"start_date": -1,
"end_date": -1,
"filter": "summary",
"language": "all",
"purchase_type": "all",
"json": 1
})
return r.json["query_summary"]
def reviews(appid: AppID) -> Tuple[int, int]:
app_review = appreviews(appid)
if app_review["total_reviews"] == 0:
return -1, -1
positive = app_review["total_positive"] / app_review["total_reviews"]
return positive, app_review["total_reviews"]
def eula(appid: AppID) -> str:
r = cache.get(f"https://store.steampowered.com//eula/{appid}_eula_0")
soup = BeautifulSoup(r.text, "html.parser").find(id="eula_content")
if soup is not None:
return soup.text
return ""
def search(query: str) -> Optional[str]:
logger.debug("Searching Steam store for %s", query)
r = cache.get("https://store.steampowered.com/search/suggest", params={
"term": query,
"f": "json",
"cc": "US",
"l": "english"
})
# Reverse results to make the first one take precedence over later ones if multiple results have the same name.
# E.g. "Wolfenstein II: The New Colossus" has both international and german version under the same name.
items = {item["name"]: item for item in reversed(r.json)}
try:
best_match = items[util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90)[0]]
logger.debug("Best match is '%s'", best_match)
type_to_slug = {
"game": "app",
"dlc": "app",
"bundle": "bundle"
}
slug = type_to_slug.get(best_match['type'], best_match['type'])
return f"https://store.steampowered.com/{slug}/{best_match['id']}"
except IndexError:
logger.debug("Unable to find %s in Steam search results", query)
return None
def update_info(link: str, release: Release) -> None:
logger.debug("Getting information about game using Steam API")
link_type, appid = re.search("(app|sub|bundle)(?:/)([0-9]+)", link).groups()
if link_type == "bundle":
logger.debug("Steam link is to bundle: not utilizing API") # Steam has no public API for bundles
return
# If the link is a package on Steam (e.g. game + dlc), we need to find the base game of the package
if link_type == "sub":
package_details = packagedetails(appid)
# Set game name to package name (e.g. 'Fallout New Vegas Ultimate' instead of 'Fallout New Vegas')
release.game_name = package_details["name"]
# Use the "base game" of the package as the basis for further computation.
# We guesstimate the base game as the most popular app (i.e. the one with the most reviews)
package_appids = [app["id"] for app in package_details["apps"]]
package_apps_details = [appdetails(appid) for appid in package_appids]
details = max(package_apps_details, key=lambda app: reviews(app["steam_appid"])[1])
appid = details["steam_appid"]
# Otherwise, if the release is a single game on Steam
else:
details = appdetails(appid)
release.game_name = details["name"]
# Now that we have a single Steam game to represent the release, use it to improve the information
release.score, release.num_reviews = reviews(appid)
# DLC releases don't always contain the word "dlc" (e.g. 'Fallout New Vegas: Dead Money'), so some DLCs get
# mislabeled as games during offline parsing. We can use Steam's API to get the correct type, but if the release was
# already deemed an update, keep it as such, because an update to a DLC is an update.
if details["type"] == "dlc" and release.type != "Update":
release.type = "DLC"
# Add highlight if "denuvo" occurs in Steam's DRM notice or potential 3rd-party EULA
if "denuvo" in (details.get("drm_notice", "") + eula(appid)).lower():
logger.info("'denuvo' found in Steam DRM-notice/EULA; adding 'DENUVO' to highlights")
release.highlights.append("DENUVO")

View file

@ -1,31 +0,0 @@
import logging
import requests
logger = logging.getLogger(__name__)
class Web(object):
def __init__(self, config, cache) -> None:
self.config = config
self.cache = cache
def search(self, query) -> list:
try:
return self._google_search(query)
except (KeyError, requests.RequestException) as e:
logger.exception(e)
logger.warning("Google search failed (probably rate-limited)")
return []
def _google_search(self, query) -> list:
logger.debug("Searching Google for %s", query)
payload = {
"key": self.config["google"]["key"],
"cx": self.config["google"]["cx"],
"q": query
}
r = self.cache.get("https://www.googleapis.com/customsearch/v1", params=payload)
return [result["link"] for result in r.json()["items"]]

View file

@ -4,7 +4,7 @@ from setuptools import setup, find_packages
from codecs import open from codecs import open
from os import path from os import path
from dailyreleases import __author__, __version__, __licence__ from .dailyreleases import __author__, __version__, __licence__
here = path.abspath(path.dirname(__file__)) here = path.abspath(path.dirname(__file__))
@ -37,8 +37,6 @@ setup(
] ]
}, },
install_requires=[ install_requires=[
"requests",
"requests_cache",
"praw", "praw",
"beautifulsoup4" "beautifulsoup4"
], ],

View file

@ -1,140 +1,160 @@
import unittest import unittest
from dailyreleases.main import DailyReleasesBot from dailyreleases import parsing
from dailyreleases.parsing import ReleaseType, Platform
class ParseDirnameTestCase(unittest.TestCase): class ParseDirnameTestCase(unittest.TestCase):
def setUp(self):
self.bot = DailyReleasesBot()
def test_single_word_release(self): def test_single_word_release(self):
p = self.bot.parse_dirname("Aztez-DARKSiDERS") p = parsing.parse_dirname("Aztez-DARKSiDERS", "nfo_link")
self.assertEqual("Aztez-DARKSiDERS", p["dirname"]) self.assertEqual("Aztez-DARKSiDERS", p.dirname)
self.assertEqual("Aztez", p["rls_name"]) self.assertEqual("Aztez", p.rls_name)
self.assertEqual("Aztez", p["game_name"]) self.assertEqual("Aztez", p.game_name)
self.assertEqual("Windows", p["platform"]) self.assertEqual(Platform.WINDOWS, p.platform)
self.assertEqual("Game", p["type"]) self.assertEqual(ReleaseType.GAME, p.type)
self.assertEqual("DARKSiDERS", p["group"]) self.assertEqual("DARKSiDERS", p.group)
self.assertIn("store.steampowered.com/app/244750", p["store_links"]["Steam"]) self.assertIn("store.steampowered.com/app/244750", p.store_links["Steam"])
self.assertEqual([], p["tags"]) self.assertEqual([], p.tags)
self.assertEqual([], p["highlights"]) self.assertEqual([], p.highlights)
def test_nuked_release(self): def test_nuked_release(self):
# TODO: Actual nuke handling? # TODO: Actual nuke handling?
p = self.bot.parse_dirname("Battlefield.1-CPY") p = parsing.parse_dirname("Battlefield.1-CPY", "nfo_link")
self.assertEqual("Battlefield.1-CPY", p["dirname"]) self.assertEqual("Battlefield.1-CPY", p.dirname)
def test_update(self): def test_update(self):
p = self.bot.parse_dirname("Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA") p = parsing.parse_dirname("Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA", "nfo_link")
self.assertEqual("Update", p["type"]) self.assertEqual(ReleaseType.UPDATE, p.type)
self.assertIn("store.steampowered.com/app/754920", p.store_links["Steam"])
def test_proper_highlight(self): def test_proper_highlight(self):
p = self.bot.parse_dirname("Death.Coming.PROPER-SiMPLEX") p = parsing.parse_dirname("Death.Coming.PROPER-SiMPLEX", "nfo_link")
self.assertEqual(["PROPER"], p["highlights"]) self.assertEqual(["PROPER"], p.highlights)
self.assertIn("store.steampowered.com/app/705120", p.store_links["Steam"])
def test_macos_release(self): def test_macos_release(self):
p = self.bot.parse_dirname("The_Fall_Part_2_Unbound_MacOS-Razor1911") p = parsing.parse_dirname("The_Fall_Part_2_Unbound_MacOS-Razor1911", "nfo_link")
self.assertEqual("Mac OSX", p["platform"]) self.assertEqual(Platform.OSX, p.platform)
self.assertEqual("Game", p["type"]) self.assertEqual(ReleaseType.GAME, p.type)
self.assertIn("store.steampowered.com/app/510490", p.store_links["Steam"])
self.assertIn("gog.com/game/the_fall_part_2_unbound", p.store_links["GOG"])
def test_macosx_update(self): def test_macosx_update(self):
p = self.bot.parse_dirname("Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911") p = parsing.parse_dirname("Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911", "nfo_link")
self.assertEqual("Mac OSX", p["platform"]) self.assertEqual(Platform.OSX, p.platform)
self.assertEqual("Update", p["type"]) self.assertEqual(ReleaseType.UPDATE, p.type)
self.assertIn("store.steampowered.com/app/344240", p.store_links["Steam"])
self.assertIn("gog.com/game/man_o_war_corsair", p.store_links["GOG"])
def test_linux_release(self): def test_linux_release(self):
p = self.bot.parse_dirname("Sphinx_And_The_Cursed_Mummy_Linux-Razor1911") p = parsing.parse_dirname("Sphinx_And_The_Cursed_Mummy_Linux-Razor1911", "nfo_link")
self.assertEqual("Linux", p["platform"]) self.assertEqual(Platform.LINUX, p.platform)
self.assertEqual("Game", p["type"]) self.assertEqual(ReleaseType.GAME, p.type)
self.assertIn("store.steampowered.com/app/606710", p.store_links["Steam"])
self.assertIn("gog.com/game/sphinx_and_the_cursed_mummy", p.store_links["GOG"])
def test_dlc_explicit(self): def test_dlc_explicit(self):
p = self.bot.parse_dirname("Fallout.4.Far.Harbor.DLC-CODEX") p = parsing.parse_dirname("Fallout.4.Far.Harbor.DLC-CODEX", "nfo_link")
self.assertEqual("DLC", p["type"]) self.assertIn("store.steampowered.com/app/435881", p.store_links["Steam"])
self.assertEqual(ReleaseType.DLC, p.type)
def test_dlc_implicit(self): def test_dlc_implicit(self):
p = self.bot.parse_dirname("Euro.Truck.Simulator.2.Italia-CODEX") p = parsing.parse_dirname("Euro.Truck.Simulator.2.Italia-CODEX", "nfo_link")
self.assertEqual("DLC", p["type"]) self.assertEqual(ReleaseType.DLC, p.type)
self.assertIn("store.steampowered.com/app/558244", p["store_links"]["Steam"]) self.assertIn("store.steampowered.com/app/558244", p.store_links["Steam"])
def test_incl_dlc_update(self): def test_incl_dlc_update(self):
p = self.bot.parse_dirname("Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX") p = parsing.parse_dirname("Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX", "nfo_link")
self.assertEqual("Update", p["type"]) self.assertEqual(ReleaseType.UPDATE, p.type)
self.assertIn("store.steampowered.com/app/612880", p.store_links["Steam"])
def test_incl_dlc_release(self): def test_incl_dlc_release(self):
p = self.bot.parse_dirname("Mutiny.Incl.DLC-DARKSiDERS") p = parsing.parse_dirname("Mutiny.Incl.DLC-DARKSiDERS", "nfo_link")
self.assertEqual("Game", p["type"]) self.assertEqual(ReleaseType.GAME, p.type)
def test_score_steam(self): def test_score_steam(self):
p1 = self.bot.parse_dirname("BioShock_Infinite-FLT") p1 = parsing.parse_dirname("BioShock_Infinite-FLT", "nfo_link")
p2 = self.bot.parse_dirname("Duke.Nukem.Forever.Complete-PLAZA") self.assertIn("store.steampowered.com/app/8870", p1.store_links["Steam"])
self.assertGreater(p1["score"], p2["score"]) p2 = parsing.parse_dirname("Duke.Nukem.Forever.Complete-PLAZA", "nfo_link")
self.assertIn("store.steampowered.com/app/57900", p2.store_links["Steam"])
self.assertGreater(p1.score, p2.score)
def test_non_steam(self): def test_non_steam(self):
p = self.bot.parse_dirname("Battlefield.1.REPACK-CPY") p = parsing.parse_dirname("Battlefield.1.REPACK-CPY", "nfo_link")
self.assertIn("www.origin.com/usa/en-us/store/battlefield/battlefield-1", p["store_links"]["Origin"]) self.assertIn("www.origin.com/usa/en-us/store/battlefield/battlefield-1", p.store_links["Origin"])
self.assertEqual(-1, p["score"]) self.assertEqual(-1, p.score)
self.assertEqual(-1, p["num_reviews"]) self.assertEqual(-1, p.num_reviews)
def test_gog_exclusive(self): def test_gog_exclusive(self):
p = self.bot.parse_dirname("Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT") # TODO: Actually use GOG API (gog.update_info)
self.assertIn("gog.com/game/dungeons_dragons_dragonshard", p["store_links"]["GOG"]) p = parsing.parse_dirname("Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT", "nfo_link")
self.assertEqual(-1, p["score"]) self.assertIn("gog.com/game/dungeons_dragons_dragonshard", p.store_links["GOG"])
self.assertEqual(-1, p.score)
def test_gog_exclusive2(self):
p = parsing.parse_dirname("Diablo.GOG.Classic-KaliMaaShaktiDe", "nfo_link")
self.assertIn("gog.com/game/diablo", p.store_links["GOG"])
def test_score_non_steam(self): def test_score_non_steam(self):
p = self.bot.parse_dirname("Ode.RIP.MULTI12-SiMPLEX") p = parsing.parse_dirname("Ode.RIP.MULTI12-SiMPLEX", "nfo_link")
self.assertEqual(-1, p["score"]) self.assertEqual(-1, p.score)
def test_tags(self): def test_tags(self):
p = self.bot.parse_dirname("Teenage.Mutant.Ninja.Turtles.Portal.Power.RIP.MULTI8-SiMPLEX") p = parsing.parse_dirname("The.Curious.Expedition.v1.3.7.1.MULTI.7.RIP-Unleashed", "nfo_link")
self.assertEqual(["RIP", "MULTI8"], p["tags"]) self.assertIn("gog.com/game/curious_expedition_the", p.store_links["GOG"])
self.assertEqual(["MULTI.7", "RIP"], p.tags)
def test_skip_software(self): def test_skip_software(self):
p = self.bot.parse_dirname("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED") p = parsing.parse_dirname("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link")
self.assertIsNone(p) self.assertIsNone(p)
def test_steam_package(self): def test_steam_package(self):
p = self.bot.parse_dirname("Farming.Simulator.17.Platinum.Edition.Update.v1.5.3-BAT") p = parsing.parse_dirname("Farming.Simulator.17.Platinum.Edition.Update.v1.5.3-BAT", "nfo_link")
self.assertEqual("Farming Simulator 17 - Platinum Edition", p["game_name"]) self.assertEqual("Farming Simulator 17 - Platinum Edition", p.game_name)
self.assertEqual("Update", p["type"]) self.assertEqual(ReleaseType.UPDATE, p.type)
self.assertIn("store.steampowered.com/sub/202103", p["store_links"]["Steam"]) self.assertIn("store.steampowered.com/sub/202103", p.store_links["Steam"])
def test_steam_package_with_dlc_first(self): def test_steam_package_with_dlc_first(self):
p = self.bot.parse_dirname("The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED") p = parsing.parse_dirname("The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED", "nfo_link")
self.assertEqual("The Witcher 3: Wild Hunt - Game of the Year Edition", p["game_name"]) self.assertEqual("The Witcher 3: Wild Hunt - Game of the Year Edition", p.game_name)
self.assertEqual("Game", p["type"]) self.assertEqual(ReleaseType.GAME, p.type)
self.assertIn("store.steampowered.com/sub/124923", p["store_links"]["Steam"]) self.assertIn("store.steampowered.com/sub/124923", p.store_links["Steam"])
def test_steam_bundle(self): def test_steam_bundle(self):
p = self.bot.parse_dirname("Valve.Complete.Pack-FAKE") p = parsing.parse_dirname("Valve.Complete.Pack-FAKE", "nfo_link")
self.assertEqual("Valve.Complete.Pack-FAKE", p["dirname"]) self.assertEqual("Valve.Complete.Pack-FAKE", p.dirname)
self.assertEqual("Valve Complete Pack", p["game_name"]) self.assertEqual("Valve Complete Pack", p.game_name)
self.assertEqual("Windows", p["platform"]) self.assertEqual("Windows", p.platform)
self.assertEqual("Game", p["type"]) self.assertEqual(ReleaseType.GAME, p.type)
self.assertIn("store.steampowered.com/bundle/232", p["store_links"]["Steam"]) self.assertIn("store.steampowered.com/bundle/232", p.store_links["Steam"])
def test_steam_denuvo(self): def test_steam_denuvo(self):
# "denuvo" occurs in the Steam EULA # "denuvo" occurs in the Steam EULA
p = self.bot.parse_dirname("Deus.Ex.Mankind.Divided-CPY") p = parsing.parse_dirname("Deus.Ex.Mankind.Divided-CPY", "nfo_link")
self.assertEqual(["DENUVO"], p["highlights"]) self.assertEqual(["DENUVO"], p.highlights)
# "denuvo" occurs in the Steam DRM notice # "denuvo" occurs in the Steam DRM notice
p = self.bot.parse_dirname("Yakuza.0-FAKE") p = parsing.parse_dirname("Yakuza.0-FAKE", "nfo_link")
self.assertEqual(["DENUVO"], p["highlights"]) self.assertEqual(["DENUVO"], p.highlights)
def test_episode_release(self): def test_episode_release(self):
p = self.bot.parse_dirname("Life.is.Strange.Before.the.Storm.Episode.3-CODEX") p = parsing.parse_dirname("Life.is.Strange.Before.the.Storm.Episode.3-CODEX", "nfo_link")
self.assertEqual("Life is Strange: Before the Storm Episode 3", p["game_name"]) self.assertEqual("Life is Strange: Before the Storm Episode 3", p.game_name)
self.assertEqual("DLC", p["type"]) self.assertEqual(ReleaseType.DLC, p.type)
self.assertIn("store.steampowered.com/app/704740", p["store_links"]["Steam"]) self.assertIn("store.steampowered.com/app/704740", p.store_links["Steam"])
def test_season_and_episode_release(self): def test_season_and_episode_release(self):
p = self.bot.parse_dirname("Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED") p = parsing.parse_dirname("Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED", "nfo_link")
self.assertEqual("Minecraft Story Mode Season Two Episode 5", p["game_name"]) self.assertEqual("Minecraft Story Mode Season Two Episode 5", p.game_name)
def test_build_is_update(self): def test_build_is_update(self):
p = self.bot.parse_dirname("DUSK.Episode.1.Build.2.6-SKIDROW") p = parsing.parse_dirname("DUSK.Episode.1.Build.2.6-SKIDROW", "nfo_link")
self.assertEqual("Update", p["type"]) self.assertEqual(ReleaseType.UPDATE, p.type)
def test_prefer_steam_to_microsoft_store(self):
p = parsing.parse_dirname("Forgiveness-PLAZA", "nfo_link")
self.assertIn("store.steampowered.com/app/971120", p.store_links["Steam"])
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -1,44 +1,57 @@
import unittest import unittest
from datetime import datetime, timedelta from datetime import datetime, timedelta
from dailyreleases.main import DailyReleasesBot from dailyreleases import parsing
from dailyreleases.parsing import Platform, ReleaseType
from dailyreleases.predbs import PredbRelease
class ParseReleasesTestCase(unittest.TestCase): class ParseReleasesTestCase(unittest.TestCase):
def setUp(self):
self.bot = DailyReleasesBot() @classmethod
def setUpClass(cls) -> None:
cls.empty_releases = {platform: {release_type: [] for release_type in ReleaseType}
for platform in Platform}
def test_dont_skip(self):
already_posted = set()
releases = [
PredbRelease("Aztez-DARKSiDERS", "nfo_link", datetime.now())
]
parsed_releases = parsing.parse_releases(releases, already_posted)
self.assertEqual(parsed_releases[Platform.WINDOWS][ReleaseType.GAME][0].game_name, "Aztez")
def test_skip_already_posted(self): def test_skip_already_posted(self):
already_posted = {"Aztez-DARKSiDERS"} already_posted = {"Aztez-DARKSiDERS"}
releases = { releases = [
"Aztez-DARKSiDERS": ("nfo_link", datetime.now()) PredbRelease("Aztez-DARKSiDERS", "nfo_link", datetime.now())
} ]
parsed_releases = self.bot.parse_releases(releases, already_posted) parsed_releases = parsing.parse_releases(releases, already_posted)
self.assertDictEqual(parsed_releases[0], dict()) self.assertDictEqual(parsed_releases, self.empty_releases)
def test_skip_blacklisted_word(self): def test_skip_blacklisted_word(self):
already_posted = set() already_posted = set()
releases = { releases = [
"Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED": ("nfo_link", datetime.now()) PredbRelease("Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED", "nfo_link", datetime.now())
} ]
parsed_releases = self.bot.parse_releases(releases, already_posted) parsed_releases = parsing.parse_releases(releases, already_posted)
self.assertDictEqual(parsed_releases[0], dict()) self.assertDictEqual(parsed_releases, self.empty_releases)
def test_skip_older_than_48hr(self): def test_skip_older_than_48hr(self):
already_posted = set() already_posted = set()
releases = { releases = [
"Aztez-DARKSiDERS": ("nfo_link", datetime.now() - timedelta(hours=50)) PredbRelease("Aztez-DARKSiDERS", "nfo_link", datetime.now() - timedelta(hours=50))
} ]
parsed_releases = self.bot.parse_releases(releases, already_posted) parsed_releases = parsing.parse_releases(releases, already_posted)
self.assertDictEqual(parsed_releases[0], dict()) self.assertDictEqual(parsed_releases, self.empty_releases)
def test_skip_no_data_for_software(self): def test_skip_no_data_for_software(self):
already_posted = set() already_posted = set()
releases = { releases = [
"Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED": ("nfo_link", datetime.now()) PredbRelease("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link", datetime.now())
} ]
parsed_releases = self.bot.parse_releases(releases, already_posted) parsed_releases = parsing.parse_releases(releases, already_posted)
self.assertDictEqual(parsed_releases[0], dict()) self.assertDictEqual(parsed_releases, self.empty_releases)
if __name__ == '__main__': if __name__ == '__main__':