1
0
Fork 0
This commit is contained in:
Casper V. Kristensen 2022-06-12 18:45:25 +02:00
parent e00b09c5d8
commit 2dcef470ad
15 changed files with 329 additions and 196 deletions

View file

@ -2,6 +2,7 @@
The main terminal-based entry point. Invoke as `dailyreleases' or `python3 -m dailyreleases'.
"""
if __name__ == '__main__':
if __name__ == "__main__":
from .main import main
main()

View file

@ -14,7 +14,9 @@ from .config import DATA_DIR, CONFIG
logger = logging.getLogger(__name__)
connection = sqlite3.connect(DATA_DIR.joinpath("cache.sqlite"))
connection.row_factory = sqlite3.Row # allow accessing rows by index and case-insensitively by name
connection.row_factory = (
sqlite3.Row
) # allow accessing rows by index and case-insensitively by name
connection.text_factory = bytes # do not try to decode bytes as utf-8 strings
DEFAULT_CACHE_TIME = timedelta(seconds=CONFIG["web"].getint("cache_time"))
@ -32,22 +34,27 @@ class Response:
def setup():
connection.execute("""
connection.execute(
"""
CREATE TABLE IF NOT EXISTS
requests (id INTEGER PRIMARY KEY,
url TEXT UNIQUE NOT NULL,
response BLOB NOT NULL,
timestamp INTEGER NOT NULL);
""")
"""
)
def clean(older_than=timedelta(days=3)):
connection.execute("""
connection.execute(
"""
DELETE FROM requests
WHERE timestamp < :cutoff;
""", {
""",
{
"cutoff": (datetime.utcnow() - older_than).timestamp(),
})
},
)
connection.commit()
connection.executescript("VACUUM;")
@ -55,8 +62,14 @@ def clean(older_than=timedelta(days=3)):
last_request = defaultdict(float)
def get(url: str, params: Mapping = None, cache_time: timedelta = DEFAULT_CACHE_TIME,
ratelimit: Optional[float] = 1, *args, **kwargs) -> Response:
def get(
url: str,
params: Mapping = None,
cache_time: timedelta = DEFAULT_CACHE_TIME,
ratelimit: Optional[float] = 1,
*args,
**kwargs
) -> Response:
"""
Sends a GET request, caching the result for cache_time. If 'ratelimit' is supplied, requests are rate limited at the
host-level to this number of requests per second.
@ -64,41 +77,51 @@ def get(url: str, params: Mapping = None, cache_time: timedelta = DEFAULT_CACHE_
if params is not None:
url += "?" + urllib.parse.urlencode(params)
request = Request(url, *args, **kwargs)
request.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0")
request.add_header(
"User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0",
)
#logger.debug("Get %s", url)
# logger.debug("Get %s", url)
row = connection.execute("""
row = connection.execute(
"""
SELECT response, timestamp
FROM requests
WHERE url = :url;
""", {
"url": url
}).fetchone()
""",
{"url": url},
).fetchone()
if row is not None and datetime.fromtimestamp(row["timestamp"]) > datetime.utcnow() - cache_time:
#logger.debug("Cache hit: %s", url)
if (
row is not None
and datetime.fromtimestamp(row["timestamp"]) > datetime.utcnow() - cache_time
):
# logger.debug("Cache hit: %s", url)
return Response(row["response"])
#logger.debug("Cache miss: %s", url)
# logger.debug("Cache miss: %s", url)
if ratelimit is not None:
min_interval = 1 / ratelimit
elapsed = time.time() - last_request[request.host]
wait = min_interval - elapsed
if wait > 0:
#logger.debug("Rate-limited for %ss", round(wait, 2))
# logger.debug("Rate-limited for %ss", round(wait, 2))
time.sleep(wait)
response = Response(urlopen(request).read())
last_request[request.host] = time.time()
connection.execute("""
connection.execute(
"""
INSERT OR REPLACE INTO requests(url, response, timestamp)
VALUES (:url, :response, :timestamp);
""", {
""",
{
"url": url,
"response": response.bytes,
"timestamp": datetime.utcnow().timestamp()
})
"timestamp": datetime.utcnow().timestamp(),
},
)
connection.commit()
return response

View file

@ -46,7 +46,7 @@ def logging_config(file, level, backup_count) -> dict:
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "standard",
"level": level
"level": level,
},
"file": {
"class": "logging.handlers.TimedRotatingFileHandler",
@ -55,18 +55,11 @@ def logging_config(file, level, backup_count) -> dict:
"filename": file,
"encoding": "utf-8",
"formatter": "standard",
"level": level
}
"level": level,
},
},
"loggers": {
"dailyreleases": {
"level": level
}
},
"root": {
"handlers": ["console", "file"],
"level": "WARNING"
}
"loggers": {"dailyreleases": {"level": level}},
"root": {"handlers": ["console", "file"], "level": "WARNING"},
}

View file

@ -24,7 +24,9 @@ def popularity(release: Release):
def row(release: Release):
# Bold row if Denuvo crack. We're checking this first so as to not actually insert 'DENUVO' as a highlight
highlights = [h for h in release.highlights if h not in ("DENUVO",)] # avoids modifying original release object
highlights = [
h for h in release.highlights if h not in ("DENUVO",)
] # avoids modifying original release object
bold = highlights != release.highlights
# The rows in the table containing updates will use the full rls_name as the name, while tables
@ -34,22 +36,27 @@ def row(release: Release):
else:
tags = " ({})".format(" ".join(release.tags)) if release.tags else ""
highlights = " **- {}**".format(", ".join(highlights)) if highlights else ""
name = "[{}{}]({}){}".format(util.markdown_escape(release.game_name),
tags,
release.nfo_link,
highlights)
name = "[{}{}]({}){}".format(
util.markdown_escape(release.game_name), tags, release.nfo_link, highlights
)
stores = ", ".join(f"[{name}]({link})" for name, link in release.store_links.items())
stores = ", ".join(
f"[{name}]({link})" for name, link in release.store_links.items()
)
if release.score == -1:
reviews = "-"
else:
num_reviews_humanized = util.humanize(release.num_reviews, precision=1, prefix="dec", suffix="")
num_reviews_humanized = util.humanize(
release.num_reviews, precision=1, prefix="dec", suffix=""
)
reviews = f"{release.score:.0%} ({num_reviews_humanized})"
r = (name, release.group, stores, reviews)
if bold:
r = tuple(f"**{c.replace('**', '')}**" for c in r) # .replace ensures no nested bold, which is unsupported
r = tuple(
f"**{c.replace('**', '')}**" for c in r
) # .replace ensures no nested bold, which is unsupported
return r
@ -70,18 +77,24 @@ def generate_post(releases: Releases) -> str:
# popular game within the group. Games are sorted by popularity internally in the groups as well.
group_order = defaultdict(lambda: (0, -1, False))
for release in type_releases:
group_order[release.group] = max(group_order[release.group], popularity(release))
group_order[release.group] = max(
group_order[release.group], popularity(release)
)
def order(release: Release):
return (group_order[release.group],
release.group, # ensure grouping if two groups share group_order
popularity(release))
return (
group_order[release.group],
release.group, # ensure grouping if two groups share group_order
popularity(release),
)
type_releases.sort(key=order, reverse=True)
post.append(f"| {type} | Group | Store | Score (Reviews) |")
post.append("|:-|:-|:-|:-|")
post.extend("| {} | {} | {} | {} |".format(*row(rls)) for rls in type_releases)
post.extend(
"| {} | {} | {} | {} |".format(*row(rls)) for rls in type_releases
)
post.append("")
post.append("&nbsp;")
@ -96,7 +109,9 @@ def generate_post(releases: Releases) -> str:
# Add link to the previous release thread
previous_post = reddit.get_previous_daily_post(CONFIG["reddit"]["posts_subreddit"])
previous_post_date = re.search("daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE).group(1)
previous_post_date = re.search(
"daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE
).group(1)
post.append(f"# [<< {previous_post_date}]({previous_post.url})")
# Add epilogue
@ -115,14 +130,14 @@ def generate_post(releases: Releases) -> str:
@util.retry(attempts=3, delay=120)
def generate(post=False, pm_recipients=None) -> None:
logger.info("-------------------------------------------------------------------------------------------------")
logger.info(
"-------------------------------------------------------------------------------------------------"
)
start_time = time.time()
processed = load_processed()
pres = predbs.get_pres()
releases = parsing.parse_pres(pre
for pre in pres
if pre.dirname not in processed)
releases = parsing.parse_pres(pre for pre in pres if pre.dirname not in processed)
# The date of the post changes at midday instead of midnight to allow calling script after 00:00
title = f"Daily Releases ({(datetime.utcnow() - timedelta(hours=12)).strftime('%B %-d, %Y')})"
@ -133,7 +148,9 @@ def generate(post=False, pm_recipients=None) -> None:
if post:
# Post to bot's own subreddit
bot_subreddit = CONFIG["reddit"]["bot_subreddit"]
reddit_src_post = reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit)
reddit_src_post = reddit.submit_post(
f"{title} - Source", generated_post_src, bot_subreddit
)
reddit_post = reddit.submit_post(title, generated_post, bot_subreddit)
# Manually approve posts since reddit seem to think posts with many links are spam
@ -155,7 +172,9 @@ def generate(post=False, pm_recipients=None) -> None:
cache.clean()
logger.info("Execution took %s seconds", int(time.time() - start_time))
logger.info("-------------------------------------------------------------------------------------------------")
logger.info(
"-------------------------------------------------------------------------------------------------"
)
def load_processed() -> Set[str]:

View file

@ -21,7 +21,9 @@ def listen_inbox() -> None:
if message.author in authorized_users:
generate(post=True, pm_recipients=(message.author.name,))
else:
logger.info("Discarding PM from %s: not authorized user", message.author)
logger.info(
"Discarding PM from %s: not authorized user", message.author
)
message.mark_read() # mark message read last so we can retry after potential fatal errors
except prawcore.PrawcoreException as e:
logger.warning("PrawcoreException: %s", e)
@ -30,6 +32,7 @@ def listen_inbox() -> None:
print("Exiting (KeyboardInterrupt)")
break
def at_midnight() -> None:
while True:
try:
@ -38,7 +41,9 @@ def at_midnight() -> None:
until_midnight = midnight - now
logger.info(f"Waiting {until_midnight} until midnight..")
sleep(until_midnight.total_seconds())
generate(post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(","))
generate(
post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(",")
)
except Exception as e:
logger.exception(e)
except KeyboardInterrupt:
@ -55,7 +60,9 @@ def main() -> None:
if mode == "test":
generate(post=False)
if mode == "immediately":
generate(post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(","))
generate(
post=True, pm_recipients=CONFIG["reddit"]["notify_users"].split(",")
)
if mode == "midnight":
at_midnight()
if mode == "reply":
@ -65,5 +72,5 @@ def main() -> None:
raise e
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -43,7 +43,9 @@ class Release:
store_links: Dict[str, str] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
highlights: List[str] = field(default_factory=list)
score: int = -1 # score and number of reviews is -1 by default; it is updated if the game exists on Steam
score: int = (
-1
) # score and number of reviews is -1 by default; it is updated if the game exists on Steam
num_reviews: int = -1
@ -111,12 +113,10 @@ BLACKLISTED = (
"Fedora",
"openSUSE",
"jQuery",
"CSS"
"ASP[._-]NET",
"CSS" "ASP[._-]NET",
"Windows[._-]Server",
"Lynda",
"OREILLY"
"Wintellectnow",
"OREILLY" "Wintellectnow",
"3ds[._-]?Max",
"For[._-]Maya",
"Cinema4D",
@ -137,8 +137,11 @@ def parse_pre(pre: Pre, offline=False) -> Release:
rls_name, group = pre.dirname.rsplit("-", maxsplit=1)
# Find game name by matching until one of the stopwords
game_name, *stopwords = re.split("[._-]({})".format("|".join(STOPWORDS + TAGS + HIGHLIGHTS)),
rls_name, flags=re.IGNORECASE)
game_name, *stopwords = re.split(
"[._-]({})".format("|".join(STOPWORDS + TAGS + HIGHLIGHTS)),
rls_name,
flags=re.IGNORECASE,
)
# Prettify game name by substituting word delimiters with spaces
game_name = re.sub("[_-]", " ", game_name)
@ -147,14 +150,18 @@ def parse_pre(pre: Pre, offline=False) -> Release:
game_name = re.sub("(\w{2,})[.]", "\g<1> ", game_name)
# Some stopwords distinguishes two otherwise identical releases (e.g. x86/x64) - we call these tags
tags = [stopword
for stopword in stopwords
if re.match("|".join(TAGS), stopword, flags=re.IGNORECASE)]
tags = [
stopword
for stopword in stopwords
if re.match("|".join(TAGS), stopword, flags=re.IGNORECASE)
]
# Some stopwords signify an important piece of information and deserve to be highlighted (e.g. PROPER)
highlights = [stopword
for stopword in stopwords
if re.match("|".join(HIGHLIGHTS), stopword, flags=re.IGNORECASE)]
highlights = [
stopword
for stopword in stopwords
if re.match("|".join(HIGHLIGHTS), stopword, flags=re.IGNORECASE)
]
# Find platform
if re.search("mac[._-]?os[._-]?x?", rls_name, flags=re.IGNORECASE):
@ -166,9 +173,15 @@ def parse_pre(pre: Pre, offline=False) -> Release:
# Find release type (Game/DLC/Update)
# Order of the if-statements is important: Update trumps DLC because an update to a DLC is an update, not a DLC!
if re.search("update|v[0-9]|addon|Crack[._-]?fix|DIR[._-]?FIX|build[._-]?[0-9]+", rls_name, flags=re.IGNORECASE):
if re.search(
"update|v[0-9]|addon|Crack[._-]?fix|DIR[._-]?FIX|build[._-]?[0-9]+",
rls_name,
flags=re.IGNORECASE,
):
rls_type = ReleaseType.UPDATE
elif re.search("(?<!incl[._-])dlc", rls_name, flags=re.IGNORECASE): # 'Incl.DLC' isn't a DLC-release
elif re.search(
"(?<!incl[._-])dlc", rls_name, flags=re.IGNORECASE
): # 'Incl.DLC' isn't a DLC-release
rls_type = ReleaseType.DLC
else:
rls_type = ReleaseType.GAME
@ -186,7 +199,7 @@ def parse_pre(pre: Pre, offline=False) -> Release:
type=rls_type,
platform=platform,
tags=tags,
highlights=highlights
highlights=highlights,
)
if offline:
@ -204,19 +217,32 @@ def parse_pre(pre: Pre, offline=False) -> Release:
try:
steam.update_info(release)
except Exception as e: # a lot of stuff can go wrong with Steam's API, better catch everything
logger.error("Failed to update release info using Steam's API on %s", release)
logger.error(
"Failed to update release info using Steam's API on %s", release
)
logger.exception(e)
logger.info("Final : %s %s : %s - %s : %s", release.platform, release.type, release.game_name, release.group,
release)
logger.info(
"Final : %s %s : %s - %s : %s",
release.platform,
release.type,
release.game_name,
release.group,
release,
)
return release
Releases = Dict[Platform, Dict[ReleaseType, List[Release]]] # {Windows: {Game: [..], DLC: [..], ..}, Linux: ...}
Releases = Dict[
Platform, Dict[ReleaseType, List[Release]]
] # {Windows: {Game: [..], DLC: [..], ..}, Linux: ...}
def parse_pres(pres: Iterable[Pre]) -> Releases:
releases = {platform: {release_type: [] for release_type in ReleaseType} for platform in Platform}
releases = {
platform: {release_type: [] for release_type in ReleaseType}
for platform in Platform
}
for pre in pres:
try:
release = parse_pre(pre)

View file

@ -22,7 +22,9 @@ def get_pres() -> List[Pre]:
pres = {}
for get in (get_predbme, get_xrel): # in reverse order of preference
try:
pres.update((p.dirname, p) for p in get()) # override duplicate dirnames in later iterations
pres.update(
(p.dirname, p) for p in get()
) # override duplicate dirnames in later iterations
except HTTPError as e:
logger.error(e)
logger.warning("Connection to predb failed, skipping..")
@ -34,20 +36,23 @@ def get_xrel(categories=("CRACKED", "UPDATE"), num_pages=2) -> List[Pre]:
logger.debug("Getting pres from xrel.to")
def get_releases_in_category(category, page):
r = cache.get("https://api.xrel.to/v2/release/browse_category.json", params={
"category_name": category,
"ext_info_type": "game",
"per_page": 100,
"page": page
})
r = cache.get(
"https://api.xrel.to/v2/release/browse_category.json",
params={
"category_name": category,
"ext_info_type": "game",
"per_page": 100,
"page": page,
},
)
return r.json["list"]
return [Pre(rls["dirname"],
rls["link_href"],
datetime.fromtimestamp(rls["time"]))
for category in categories
for page in range(1, num_pages)
for rls in get_releases_in_category(category, page)]
return [
Pre(rls["dirname"], rls["link_href"], datetime.fromtimestamp(rls["time"]))
for category in categories
for page in range(1, num_pages)
for rls in get_releases_in_category(category, page)
]
def get_predbme() -> List[Pre]:
@ -59,7 +64,4 @@ def get_predbme() -> List[Pre]:
# Predb.me doesn't show timestamps in the RSS-feed, but the feed is so short it only shows ~72 hours worth of
# releases anyway, so we just set timestamp to now.
now = datetime.utcnow()
return [Pre(item.find("title").text,
item.find("guid").text,
now)
for item in soup]
return [Pre(item.find("title").text, item.find("guid").text, now) for item in soup]

View file

@ -24,8 +24,9 @@ def submit_post(title, text, subreddit) -> Submission:
def get_previous_daily_post(subreddit) -> Submission:
logger.info("Getting previous daily post from r/%s", subreddit)
posts = praw.subreddit(subreddit).search('title:"daily releases"', sort="new", syntax="lucene",
time_filter="week")
posts = praw.subreddit(subreddit).search(
'title:"daily releases"', sort="new", syntax="lucene", time_filter="week"
)
return next(
p
for p in posts

View file

@ -30,7 +30,7 @@ def find_store_links(game_name: str) -> Dict[str, str]:
"bigfishgames.com/games": "Big Fish Games",
"gamejolt.com": "Game Jolt",
"alawar.com": "Alawar",
"wildtangent.com": "WildTangent Games"
"wildtangent.com": "WildTangent Games",
}
# Multiple store links are sometimes returned, but we believe in Google's algorithm and choose the first one

View file

@ -10,17 +10,16 @@ logger = logging.getLogger(__name__)
def search(query: str) -> Optional[str]:
logger.debug("Searching GOG for %s", query)
r = cache.get("https://www.gog.com/games/ajax/filtered", params={
"search": query,
"mediaType": "game",
"limit": 5
})
products = {p["title"]: p
for p in r.json["products"]
if p["isGame"]}
r = cache.get(
"https://www.gog.com/games/ajax/filtered",
params={"search": query, "mediaType": "game", "limit": 5},
)
products = {p["title"]: p for p in r.json["products"] if p["isGame"]}
try:
best_match = products[util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90)[0]]
best_match = products[
util.case_insensitive_close_matches(query, products, n=1, cutoff=0.90)[0]
]
logger.debug("Best match is '%s'", best_match)
return "https://gog.com{url}".format(**best_match)
except IndexError:

View file

@ -14,28 +14,32 @@ AppID = TypeVar("AppID", int, str)
def appdetails(appid: AppID) -> dict:
r = cache.get("https://store.steampowered.com/api/appdetails", params={
"appids": appid
})
r = cache.get(
"https://store.steampowered.com/api/appdetails", params={"appids": appid}
)
return r.json[str(appid)]["data"]
def packagedetails(appid: AppID) -> dict:
r = cache.get("https://store.steampowered.com/api/packagedetails", params={
"packageids": appid
})
r = cache.get(
"https://store.steampowered.com/api/packagedetails",
params={"packageids": appid},
)
return r.json[str(appid)]["data"]
def appreviews(appid: AppID) -> dict:
r = cache.get(f"https://store.steampowered.com/appreviews/{appid}", params={
"start_date": -1,
"end_date": -1,
"filter": "summary",
"language": "all",
"purchase_type": "all",
"json": 1
})
r = cache.get(
f"https://store.steampowered.com/appreviews/{appid}",
params={
"start_date": -1,
"end_date": -1,
"filter": "summary",
"language": "all",
"purchase_type": "all",
"json": 1,
},
)
return r.json["query_summary"]
@ -59,26 +63,22 @@ def eula(appid: AppID) -> str:
def search(query: str) -> Optional[str]:
logger.debug("Searching Steam store for %s", query)
r = cache.get("https://store.steampowered.com/search/suggest", params={
"term": query,
"f": "json",
"cc": "US",
"l": "english"
})
r = cache.get(
"https://store.steampowered.com/search/suggest",
params={"term": query, "f": "json", "cc": "US", "l": "english"},
)
# Reverse results to make the first one take precedence over later ones if multiple results have the same name.
# E.g. "Wolfenstein II: The New Colossus" has both international and german version under the same name.
items = {item["name"]: item for item in reversed(r.json)}
try:
best_match = items[util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90)[0]]
best_match = items[
util.case_insensitive_close_matches(query, items, n=1, cutoff=0.90)[0]
]
logger.debug("Best match is '%s'", best_match)
type_to_slug = {
"game": "app",
"dlc": "app",
"bundle": "bundle"
}
slug = type_to_slug.get(best_match['type'], best_match['type'])
type_to_slug = {"game": "app", "dlc": "app", "bundle": "bundle"}
slug = type_to_slug.get(best_match["type"], best_match["type"])
return f"https://store.steampowered.com/{slug}/{best_match['id']}"
except IndexError:
logger.debug("Unable to find %s in Steam search results", query)
@ -91,7 +91,9 @@ def update_info(release: Release) -> None:
link_type, appid = re.search("(app|sub|bundle)(?:/)([0-9]+)", link).groups()
if link_type == "bundle":
logger.debug("Steam link is to bundle: not utilizing API") # Steam has no public API for bundles
logger.debug(
"Steam link is to bundle: not utilizing API"
) # Steam has no public API for bundles
return
# If the link is a package on Steam (e.g. game + dlc), we need to find the base game of the package
@ -105,7 +107,9 @@ def update_info(release: Release) -> None:
# We guesstimate the base game as the most popular app (i.e. the one with most reviews) among the first three
package_appids = [app["id"] for app in package_details["apps"][:3]]
package_apps_details = [appdetails(appid) for appid in package_appids]
details = max(package_apps_details, key=lambda app: reviews(app["steam_appid"])[1])
details = max(
package_apps_details, key=lambda app: reviews(app["steam_appid"])[1]
)
appid = details["steam_appid"]
# Otherwise, if the release is a single game on Steam
@ -123,6 +127,11 @@ def update_info(release: Release) -> None:
release.type = "DLC"
# Add highlight if "denuvo" occurs in Steam's DRM notice or potential 3rd-party EULA
if "denuvo" in details.get("drm_notice", "").lower() or "denuvo" in eula(appid).lower():
logger.info("'denuvo' found in Steam DRM-notice/EULA; adding 'DENUVO' to highlights")
if (
"denuvo" in details.get("drm_notice", "").lower()
or "denuvo" in eula(appid).lower()
):
logger.info(
"'denuvo' found in Steam DRM-notice/EULA; adding 'DENUVO' to highlights"
)
release.highlights.append("DENUVO")

View file

@ -11,11 +11,14 @@ logger = logging.getLogger(__name__)
def web_search(query: str) -> List[str]:
logger.debug("Searching Google for %s", query)
try:
r = cache.get("https://www.googleapis.com/customsearch/v1", params={
"key": CONFIG["google"]["key"],
"cx": CONFIG["google"]["cx"],
"q": query
})
r = cache.get(
"https://www.googleapis.com/customsearch/v1",
params={
"key": CONFIG["google"]["key"],
"cx": CONFIG["google"]["cx"],
"q": query,
},
)
return [result["link"] for result in r.json["items"]]
except (KeyError, HTTPError) as e:
logger.exception(e)

View file

@ -14,31 +14,35 @@ def humanize(n: int, precision=2, prefix="bin", suffix="B") -> str:
"""
abbrevs = {
"dec": [
(1000 ** 5, 'P' + suffix),
(1000 ** 4, 'T' + suffix),
(1000 ** 3, 'G' + suffix),
(1000 ** 2, 'M' + suffix),
(1000 ** 1, 'k' + suffix)
(1000 ** 5, "P" + suffix),
(1000 ** 4, "T" + suffix),
(1000 ** 3, "G" + suffix),
(1000 ** 2, "M" + suffix),
(1000 ** 1, "k" + suffix),
],
"bin": [
(1 << 50, 'Pi' + suffix),
(1 << 40, 'Ti' + suffix),
(1 << 30, 'Gi' + suffix),
(1 << 20, 'Mi' + suffix),
(1 << 10, 'ki' + suffix)
]
(1 << 50, "Pi" + suffix),
(1 << 40, "Ti" + suffix),
(1 << 30, "Gi" + suffix),
(1 << 20, "Mi" + suffix),
(1 << 10, "ki" + suffix),
],
}
factor, suffix = next(((f, s) for f, s in abbrevs[prefix] if n >= f), (1, suffix))
return "{1:.{0}f}".format(precision, n / factor).rstrip("0").rstrip(".") + suffix
def case_insensitive_close_matches(word: str, possibilities: Sequence[str], n=3, cutoff=0.6) -> List[str]:
def case_insensitive_close_matches(
word: str, possibilities: Sequence[str], n=3, cutoff=0.6
) -> List[str]:
"""
Python's difflib.get_close_matches does case sensitive sequence matching, this function decorates the library
function to make it case insensitive.
"""
possibilities = {sequence.lower(): sequence for sequence in possibilities}
close_matches = difflib.get_close_matches(word.lower(), possibilities, n=n, cutoff=cutoff)
close_matches = difflib.get_close_matches(
word.lower(), possibilities, n=n, cutoff=cutoff
)
return [possibilities[m] for m in close_matches]
@ -72,16 +76,21 @@ def retry(attempts=3, delay=0):
"""
Retry wrapped function `attempts` times.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(1, attempts+1):
for i in range(1, attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
logger.exception(f"{func.__name__} attempt {i}/{attempts}", exc_info=e)
logger.exception(
f"{func.__name__} attempt {i}/{attempts}", exc_info=e
)
if i >= attempts:
raise
time.sleep(delay)
return wrapper
return decorator

View file

@ -1,5 +1,6 @@
# Always prefer setuptools over distutils
from setuptools import setup, find_packages
# To use a consistent encoding
from codecs import open
from os import path
@ -18,9 +19,7 @@ setup(
description="A reddit bot that consolidates scene releases",
long_description=long_description,
long_description_content_type="text/markdown",
project_urls={
"Source": "https://git.caspervk.net/caspervk/dailyreleases.git"
},
project_urls={"Source": "https://git.caspervk.net/caspervk/dailyreleases.git"},
author=__author__,
classifiers=[
"Development Status :: 3 - Alpha",
@ -31,18 +30,7 @@ setup(
license=__licence__,
packages=find_packages(exclude=["tests"]),
include_package_data=True,
package_data={
"dailyreleases": [
"*.default"
]
},
install_requires=[
"praw==6.4.0",
"beautifulsoup4==4.7.1"
],
entry_points={
"console_scripts": [
"dailyreleases = dailyreleases.main:main"
]
},
)
package_data={"dailyreleases": ["*.default"]},
install_requires=["praw==6.4.0", "beautifulsoup4==4.7.1"],
entry_points={"console_scripts": ["dailyreleases = dailyreleases.main:main"]},
)

View file

@ -22,17 +22,25 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertEqual([], r.highlights)
def test_error_on_blacklisted_word(self):
pre = Pre("Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED", "nfo_link", datetime.utcnow())
pre = Pre(
"Anthemion.Software.DialogBlocks.v5.15.LINUX.Incl.Keygen-AMPED",
"nfo_link",
datetime.utcnow(),
)
with self.assertRaisesRegex(ParseError, "Contains blacklisted word"):
parsing.parse_pre(pre)
def test_error_on_old(self):
pre = Pre("Aztez-DARKSiDERS", "nfo_link", datetime.utcnow() - timedelta(hours=50))
pre = Pre(
"Aztez-DARKSiDERS", "nfo_link", datetime.utcnow() - timedelta(hours=50)
)
with self.assertRaisesRegex(ParseError, "Older than 48 hours"):
parsing.parse_pre(pre)
def test_error_on_software(self):
pre = Pre("Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link", datetime.utcnow())
pre = Pre(
"Tecplot.RS.2017.R1.v1.2.85254.X64-AMPED", "nfo_link", datetime.utcnow()
)
with self.assertRaisesRegex(ParseError, "No store link: probably software"):
parsing.parse_pre(pre)
@ -43,7 +51,11 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertEqual("Battlefield.1-CPY", r.dirname)
def test_update(self):
pre = Pre("Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA", "nfo_link", datetime.utcnow())
pre = Pre(
"Car.Mechanic.Simulator.2018.Plymouth.Update.v1.5.1.Hotfix-PLAZA",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertEqual(ReleaseType.UPDATE, r.type)
self.assertIn("store.steampowered.com/app/754920", r.store_links["Steam"])
@ -55,7 +67,9 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertIn("store.steampowered.com/app/705120", r.store_links["Steam"])
def test_macos_release(self):
pre = Pre("The_Fall_Part_2_Unbound_MacOS-Razor1911", "nfo_link", datetime.utcnow())
pre = Pre(
"The_Fall_Part_2_Unbound_MacOS-Razor1911", "nfo_link", datetime.utcnow()
)
r = parsing.parse_pre(pre)
self.assertEqual(Platform.OSX, r.platform)
self.assertEqual(ReleaseType.GAME, r.type)
@ -63,7 +77,11 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertIn("gog.com/game/the_fall_part_2_unbound", r.store_links["GOG"])
def test_macosx_update(self):
pre = Pre("Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911", "nfo_link", datetime.utcnow())
pre = Pre(
"Man_O_War_Corsair_Warhammer_Naval_Battles_v1.3.2_MacOSX-Razor1911",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertEqual(Platform.OSX, r.platform)
self.assertEqual(ReleaseType.UPDATE, r.type)
@ -71,7 +89,9 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertIn("gog.com/game/man_o_war_corsair", r.store_links["GOG"])
def test_linux_release(self):
pre = Pre("Sphinx_And_The_Cursed_Mummy_Linux-Razor1911", "nfo_link", datetime.utcnow())
pre = Pre(
"Sphinx_And_The_Cursed_Mummy_Linux-Razor1911", "nfo_link", datetime.utcnow()
)
r = parsing.parse_pre(pre)
self.assertEqual(Platform.LINUX, r.platform)
self.assertEqual(ReleaseType.GAME, r.type)
@ -91,7 +111,11 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertIn("store.steampowered.com/app/558244", r.store_links["Steam"])
def test_incl_dlc_update(self):
pre = Pre("Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX", "nfo_link", datetime.utcnow())
pre = Pre(
"Wolfenstein.II.The.New.Colossus.Update.5.incl.DLC-CODEX",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertEqual(ReleaseType.UPDATE, r.type)
self.assertIn("store.steampowered.com/app/612880", r.store_links["Steam"])
@ -113,13 +137,20 @@ class ParseDirnameTestCase(unittest.TestCase):
def test_non_steam(self):
pre = Pre("Battlefield.1.REPACK-CPY", "nfo_link", datetime.utcnow())
r = parsing.parse_pre(pre)
self.assertIn("www.origin.com/usa/en-us/store/battlefield/battlefield-1", r.store_links["Origin"])
self.assertIn(
"www.origin.com/usa/en-us/store/battlefield/battlefield-1",
r.store_links["Origin"],
)
self.assertEqual(-1, r.score)
self.assertEqual(-1, r.num_reviews)
def test_gog_exclusive(self):
# TODO: Actually use GOG API (gog.update_info)
pre = Pre("Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT", "nfo_link", datetime.utcnow())
pre = Pre(
"Dungeons.and.Dragons.Dragonshard.v2.0.0.10.Multilingual-DELiGHT",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertIn("gog.com/game/dungeons_dragons_dragonshard", r.store_links["GOG"])
self.assertEqual(-1, r.score)
@ -132,7 +163,9 @@ class ParseDirnameTestCase(unittest.TestCase):
def test_epic_games_exclusive(self):
pre = Pre("Journey-CODEX", "nfo_link", datetime.utcnow())
r = parsing.parse_pre(pre)
self.assertIn("epicgames.com/store/en-US/product/journey", r.store_links["Epic Games"])
self.assertIn(
"epicgames.com/store/en-US/product/journey", r.store_links["Epic Games"]
)
def test_score_non_steam(self):
pre = Pre("Ode.RIP.MULTI12-SiMPLEX", "nfo_link", datetime.utcnow())
@ -140,7 +173,11 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertEqual(-1, r.score)
def test_tags(self):
pre = Pre("The.Curious.Expedition.v1.3.7.1.MULTI.7.RIP-Unleashed", "nfo_link", datetime.utcnow())
pre = Pre(
"The.Curious.Expedition.v1.3.7.1.MULTI.7.RIP-Unleashed",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertIn("gog.com/game/curious_expedition_the", r.store_links["GOG"])
self.assertEqual(["MULTI.7", "RIP"], r.tags)
@ -149,13 +186,21 @@ class ParseDirnameTestCase(unittest.TestCase):
pre = Pre("Anno.2070.Complete.Edition-FAKE", "nfo_link", datetime.utcnow())
r = parsing.parse_pre(pre)
self.assertEqual("Anno 2070 Complete Edition", r.game_name)
self.assertGreaterEqual(r.num_reviews, 9354) # make sure we got the right game from the package
self.assertGreaterEqual(
r.num_reviews, 9354
) # make sure we got the right game from the package
self.assertIn("store.steampowered.com/sub/26683", r.store_links["Steam"])
def test_steam_package_with_dlc_first(self):
pre = Pre("The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED", "nfo_link", datetime.utcnow())
pre = Pre(
"The.Witcher.3.Wild.Hunt.Game.of.The.Year.Edition-RELOADED",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertEqual("The Witcher 3: Wild Hunt - Game of the Year Edition", r.game_name)
self.assertEqual(
"The Witcher 3: Wild Hunt - Game of the Year Edition", r.game_name
)
self.assertEqual(ReleaseType.GAME, r.type)
self.assertIn("store.steampowered.com/sub/124923", r.store_links["Steam"])
@ -179,14 +224,22 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertEqual(["DENUVO"], r.highlights)
def test_episode_release(self):
pre = Pre("Life.is.Strange.Before.the.Storm.Episode.3-CODEX", "nfo_link", datetime.utcnow())
pre = Pre(
"Life.is.Strange.Before.the.Storm.Episode.3-CODEX",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertEqual("Life is Strange: Before the Storm Episode 3", r.game_name)
self.assertEqual(ReleaseType.DLC, r.type)
self.assertIn("store.steampowered.com/app/704740", r.store_links["Steam"])
def test_season_and_episode_release(self):
pre = Pre("Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED", "nfo_link", datetime.utcnow())
pre = Pre(
"Minecraft.Story.Mode.Season.Two.Episode.5.MacOSX-RELOADED",
"nfo_link",
datetime.utcnow(),
)
r = parsing.parse_pre(pre)
self.assertEqual("Minecraft Story Mode Season Two Episode 5", r.game_name)
@ -216,5 +269,5 @@ class ParseDirnameTestCase(unittest.TestCase):
self.assertEqual("GTA 5 The Complete Edition", r.game_name)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()