From e05e1362870e3bf52f326cae8103bb78d7226332 Mon Sep 17 00:00:00 2001 From: "Casper V. Kristensen" Date: Sun, 4 Aug 2024 23:40:46 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8F=9D=EF=B8=8F=F0=9F=92=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/main.py b/main.py index 03f88f0..854562d 100644 --- a/main.py +++ b/main.py @@ -1,15 +1,36 @@ import asyncio -import webbrowser +from asyncio import Lock +from time import time import certstream import httpx +from selenium import webdriver from termcolor import colored client = httpx.AsyncClient() semaphore = asyncio.Semaphore(10) +lock = Lock() +last_open_time = 0 +driver = webdriver.Firefox() +driver.set_page_load_timeout(9) +cringe = { + "parkingcrew.net", + "sale_banner", + "sellerratings", + "website coming soon", + "parked", + "parking", + "domain", + "related searches", + "/lander", + "window.park", + "sorry, site could not be found", +} +already_shown = set() async def handler(message: dict, context: dict) -> None: + global last_open_time if message["message_type"] != "certificate_update": return @@ -19,21 +40,46 @@ async def handler(message: dict, context: dict) -> None: except StopIteration: return + if "cpanel" in domain: + return + url = f"https://{domain}" try: async with semaphore: - r = await client.head(url, timeout=1) + headers = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0", + } + r = await client.get(url, timeout=5, headers=headers) except httpx.HTTPError: print(colored("???", on_color="on_dark_grey"), domain) return + try: + r.json() + return + except: + 1337 + + if len(r.text) == 0: + return + + # TODO: performance + if any(c in r.text.lower() for c in cringe): + return + if r.status_code != 200: print(colored(str(r.status_code), on_color="on_yellow"), domain) return print(colored(str(r.status_code), on_color="on_cyan"), domain) - webbrowser.open_new_tab(url) + async with lock: + if last_open_time + 10 < time(): + if r.text in already_shown: + return + already_shown.add(r.text) + last_open_time = time() + driver.get(url) async def main() -> None: