import asyncio from asyncio import Lock from time import time import certstream import httpx from selenium import webdriver from termcolor import colored client = httpx.AsyncClient() semaphore = asyncio.Semaphore(10) lock = Lock() last_open_time = 0 driver = webdriver.Firefox() driver.set_page_load_timeout(9) cringe = { "parkingcrew.net", "sale_banner", "sellerratings", "website coming soon", "parked", "parking", "domain", "related searches", "/lander", "window.park", "sorry, site could not be found", } already_shown = set() async def handler(message: dict, context: dict) -> None: global last_open_time if message["message_type"] != "certificate_update": return domains = message["data"]["leaf_cert"]["all_domains"] try: domain = next(d for d in domains if "*" not in d) except StopIteration: return if "cpanel" in domain: return url = f"https://{domain}" try: async with semaphore: headers = { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0", } r = await client.get(url, timeout=5, headers=headers) except httpx.HTTPError: print(colored("???", on_color="on_dark_grey"), domain) return try: r.json() return except: 1337 if len(r.text) == 0: return # TODO: performance if any(c in r.text.lower() for c in cringe): return if r.status_code != 200: print(colored(str(r.status_code), on_color="on_yellow"), domain) return print(colored(str(r.status_code), on_color="on_cyan"), domain) async with lock: if last_open_time + 10 < time(): if r.text in already_shown: return already_shown.add(r.text) last_open_time = time() driver.get(url) async def main() -> None: loop = asyncio.get_running_loop() def callback(message: dict, context: dict) -> None: asyncio.run_coroutine_threadsafe(handler(message, context), loop=loop) await asyncio.to_thread( certstream.listen_for_events, message_callback=callback, url="wss://certstream.calidog.io/", ) asyncio.run(main())