1
0
Fork 0

Rank releases with highlights (e.g. PROPER/DENUVO) higher than those without.

This commit is contained in:
Casper V. Kristensen 2019-03-08 22:28:26 +01:00
parent 0b5cdd8e3d
commit de8e447954
Signed by: caspervk
GPG key ID: 289CA03790535054
2 changed files with 161 additions and 153 deletions

158
dailyreleases/generation.py Normal file
View file

@ -0,0 +1,158 @@
import inspect
import logging
import re
import textwrap
import time
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Set
from . import util, reddit, predbs, parsing
from .config import CONFIG, DATA_DIR
from .parsing import ParsedReleases, Release, ReleaseType
logger = logging.getLogger(__name__)
def generate_post(parsed_releases: ParsedReleases) -> str:
post = []
for platform, platform_releases in parsed_releases.items():
if sum(len(pr) for pr in platform_releases.values()) == 0:
continue
post.append(f"# {platform}")
for release_type, releases in platform_releases.items():
if not releases:
continue
# Releases in the tables are grouped by release group, and the groups are ordered according to the most
# popular game within the group. Games are sorted by popularity internally in the groups as well.
def popularity(release: Release):
# The popularity of a game is defined by the number of reviews it has on Steam, however:
# - We rank RIPs lower than non-RIPs so the same game released as both will sort the non-RIP first.
# - Releases with highlights (e.g. PROPER/DENUVO) are always ranked highest.
is_rip = "RIP" in [tag.upper() for tag in release.tags]
return len(release.highlights), release.num_reviews, not is_rip
group_order = defaultdict(lambda: (-1, False))
for release in releases:
group_order[release.group] = max(group_order[release.group], popularity(release))
def order(release: Release):
return (group_order[release.group],
release.group, # ensure grouping if two groups share group_order
popularity(release))
def row(release: Release):
# The rows in the table containing updates will use the full rls_name as the name, while tables
# containing game and DLC releases will show tags and highlights, as well as the stylized game_name.
if release_type == ReleaseType.UPDATE:
name = f"[{release.rls_name}]({release.nfo_link})"
else:
tags = " ({})".format(" ".join(release.tags)) if release.tags else ""
highlights = " **- {}**".format(", ".join(release.highlights)) if release.highlights else ""
name = "[{}{}]({}){}".format(util.markdown_escape(release.game_name),
tags,
release.nfo_link,
highlights)
if release.score == -1:
reviews = "-"
else:
num_reviews_humanized = util.humanize(release.num_reviews, precision=1, prefix="dec", suffix="")
reviews = f"{release.score:.0%} ({num_reviews_humanized})"
stores = ", ".join(f"[{name}]({link})" for name, link in release.store_links.items())
return name, release.group, stores, reviews
post.append(f"| {release_type} | Group | Store | Score (Reviews) |")
post.append("|:-|:-|:-|:-|")
post.extend("| {} | {} | {} | {} |".format(*row(rls)) for rls in sorted(releases, key=order, reverse=True))
post.append("")
post.append(" ")
post.append("")
post.append("")
post.append("")
if not post:
logger.warning("Post is empty!")
post.append("No releases today! :o")
# Add link to the previous release thread
previous_post = reddit.get_previous_daily_post(CONFIG["reddit"]["posts_subreddit"])
previous_post_date = re.search("daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE).group(1)
post.append(f"# [<< {previous_post_date}]({previous_post.url})")
# Add epilogue
try:
with DATA_DIR.joinpath("epilogue.txt").open() as file:
post.extend(line.rstrip() for line in file.readlines())
except FileNotFoundError:
logger.info("No epilogue.txt")
# Convert post list to string
post_str = "\n".join(post)
logger.debug("Generated post:\n%s", post_str)
return post_str
def generate(post=False, pm_recipients=None) -> None:
logger.info("-------------------------------------------------------------------------------------------------")
start_time = time.time()
already_posted = load_already_posted()
releases = predbs.get_releases()
# Remove old releases from already_posted to save space
already_posted.intersection_update(releases)
parsed_releases = parsing.parse_releases(releases, already_posted)
# The date of the post changes at midday instead of midnight to allow calling script after 00:00
title = f"Daily Releases ({(datetime.today() - timedelta(hours=12)).strftime('%B %-d, %Y')})"
generated_post = generate_post(parsed_releases)
generated_post_src = textwrap.indent(generated_post, " ")
if post:
# Post to bot's own subreddit
bot_subreddit = CONFIG["reddit"]["bot_subreddit"]
reddit_src_post = reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit)
reddit_post = reddit.submit_post(title, generated_post, bot_subreddit)
# Manually approve posts since reddit seem to think posts with many links are spam
reddit_src_post.mod.approve()
reddit_post.mod.approve()
save_already_posted(already_posted)
if pm_recipients is not None:
msg = inspect.cleandoc(
f"""
[Preview]({reddit_post.url})
[Source]({reddit_src_post.url})
"""
)
for recipient in pm_recipients:
reddit.send_pm(recipient, title, msg)
logger.info("Execution took %s seconds", int(time.time() - start_time))
logger.info("-------------------------------------------------------------------------------------------------")
def load_already_posted() -> Set[str]:
try:
with DATA_DIR.joinpath("already_posted").open() as file:
return {line.rstrip() for line in file.readlines()}
except FileNotFoundError:
return set()
def save_already_posted(already_posted) -> None:
logger.info("Saving already posted to file")
with DATA_DIR.joinpath("already_posted").open("w") as file:
file.writelines(already_posted)

View file

@ -1,17 +1,10 @@
import inspect
import logging
import re
import textwrap
import time
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Set
import prawcore
from . import __version__, util, reddit, predbs, parsing
from .config import CONFIG, DATA_DIR
from .parsing import ParsedReleases, Release, ReleaseType
from . import __version__, reddit
from .config import CONFIG
from .generation import generate
logger = logging.getLogger(__name__)
@ -36,149 +29,6 @@ def listen_inbox() -> None:
break
def generate_post(parsed_releases: ParsedReleases) -> str:
post = []
for platform, platform_releases in parsed_releases.items():
if sum(len(pr) for pr in platform_releases.values()) == 0:
continue
post.append(f"# {platform}")
for release_type, releases in platform_releases.items():
if not releases:
continue
# Releases in the tables are grouped by release group, and the groups are ordered according to the most
# popular game within the group. Games are sorted by popularity internally in the groups as well.
def popularity(release: Release):
# The popularity of a game is defined by the number of reviews it has on Steam, however, we rank RIPs
# lower than non-RIPs so the same game released as both will sort the non-RIP first.
is_rip = "RIP" in [tag.upper() for tag in release.tags]
return release.num_reviews, not is_rip
group_order = defaultdict(lambda: (-1, False))
for release in releases:
group_order[release.group] = max(group_order[release.group], popularity(release))
def order(release: Release):
return (group_order[release.group],
release.group, # ensure grouping if two groups share group_order
popularity(release))
def row(release: Release):
# The rows in the table containing updates will use the full rls_name as the name, while tables
# containing game and DLC releases will show tags and highlights, as well as the stylized game_name.
if release_type == ReleaseType.UPDATE:
name = f"[{release.rls_name}]({release.nfo_link})"
else:
tags = " ({})".format(" ".join(release.tags)) if release.tags else ""
highlights = " **- {}**".format(", ".join(release.highlights)) if release.highlights else ""
name = "[{}{}]({}){}".format(util.markdown_escape(release.game_name),
tags,
release.nfo_link,
highlights)
if release.score == -1:
reviews = "-"
else:
num_reviews_humanized = util.humanize(release.num_reviews, precision=1, prefix="dec", suffix="")
reviews = f"{release.score:.0%} ({num_reviews_humanized})"
stores = ", ".join(f"[{name}]({link})" for name, link in release.store_links.items())
return name, release.group, stores, reviews
post.append(f"| {release_type} | Group | Store | Score (Reviews) |")
post.append("|:-|:-|:-|:-|")
post.extend("| {} | {} | {} | {} |".format(*row(rls)) for rls in sorted(releases, key=order, reverse=True))
post.append("")
post.append("&nbsp;")
post.append("")
post.append("")
post.append("")
if not post:
logger.warning("Post is empty!")
post.append("No releases today! :o")
# Add link to the previous release thread
previous_post = reddit.get_previous_daily_post(CONFIG["reddit"]["posts_subreddit"])
previous_post_date = re.search("daily release.*[(](.*)[)]", previous_post.title, flags=re.IGNORECASE).group(1)
post.append(f"# [<< {previous_post_date}]({previous_post.url})")
# Add epilogue
try:
with DATA_DIR.joinpath("epilogue.txt").open() as file:
post.extend(line.rstrip() for line in file.readlines())
except FileNotFoundError:
logger.info("No epilogue.txt")
# Convert post list to string
post_str = "\n".join(post)
logger.debug("Generated post:\n%s", post_str)
return post_str
def generate(post=False, pm_recipients=None) -> None:
logger.info("-------------------------------------------------------------------------------------------------")
start_time = time.time()
already_posted = load_already_posted()
releases = predbs.get_releases()
# Remove old releases from already_posted to save space
already_posted.intersection_update(releases)
parsed_releases = parsing.parse_releases(releases, already_posted)
# The date of the post changes at midday instead of midnight to allow calling script after 00:00
title = f"Daily Releases ({(datetime.today() - timedelta(hours=12)).strftime('%B %-d, %Y')})"
generated_post = generate_post(parsed_releases)
generated_post_src = textwrap.indent(generated_post, " ")
if post:
# Post to bot's own subreddit
bot_subreddit = CONFIG["reddit"]["bot_subreddit"]
reddit_src_post = reddit.submit_post(f"{title} - Source", generated_post_src, bot_subreddit)
reddit_post = reddit.submit_post(title, generated_post, bot_subreddit)
# Manually approve posts since reddit seem to think posts with many links are spam
reddit_src_post.mod.approve()
reddit_post.mod.approve()
save_already_posted(already_posted)
if pm_recipients is not None:
msg = inspect.cleandoc(
f"""
[Preview]({reddit_post.url})
[Source]({reddit_src_post.url})
"""
)
for recipient in pm_recipients:
reddit.send_pm(recipient, title, msg)
logger.info("Execution took %s seconds", int(time.time() - start_time))
logger.info("-------------------------------------------------------------------------------------------------")
def load_already_posted() -> Set[str]:
try:
with DATA_DIR.joinpath("already_posted").open() as file:
return {line.rstrip() for line in file.readlines()}
except FileNotFoundError:
return set()
def save_already_posted(already_posted) -> None:
logger.info("Saving already posted to file")
with DATA_DIR.joinpath("already_posted").open("w") as file:
file.writelines(already_posted)
def main() -> None:
print(f"Starting Daily Releases Bot v{__version__}")
mode = CONFIG["main"]["mode"]