99 lines
2.3 KiB
Python
99 lines
2.3 KiB
Python
import asyncio
|
|
from asyncio import Lock
|
|
from time import time
|
|
|
|
import certstream
|
|
import httpx
|
|
from selenium import webdriver
|
|
from termcolor import colored
|
|
|
|
client = httpx.AsyncClient()
|
|
semaphore = asyncio.Semaphore(10)
|
|
lock = Lock()
|
|
last_open_time = 0
|
|
driver = webdriver.Firefox()
|
|
driver.set_page_load_timeout(9)
|
|
cringe = {
|
|
"parkingcrew.net",
|
|
"sale_banner",
|
|
"sellerratings",
|
|
"website coming soon",
|
|
"parked",
|
|
"parking",
|
|
"domain",
|
|
"related searches",
|
|
"/lander",
|
|
"window.park",
|
|
"sorry, site could not be found",
|
|
}
|
|
already_shown = set()
|
|
|
|
|
|
async def handler(message: dict, context: dict) -> None:
|
|
global last_open_time
|
|
if message["message_type"] != "certificate_update":
|
|
return
|
|
|
|
domains = message["data"]["leaf_cert"]["all_domains"]
|
|
try:
|
|
domain = next(d for d in domains if "*" not in d)
|
|
except StopIteration:
|
|
return
|
|
|
|
if "cpanel" in domain:
|
|
return
|
|
|
|
url = f"https://{domain}"
|
|
|
|
try:
|
|
async with semaphore:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0",
|
|
}
|
|
r = await client.get(url, timeout=5, headers=headers)
|
|
except httpx.HTTPError:
|
|
print(colored("???", on_color="on_dark_grey"), domain)
|
|
return
|
|
|
|
try:
|
|
r.json()
|
|
return
|
|
except:
|
|
1337
|
|
|
|
if len(r.text) == 0:
|
|
return
|
|
|
|
# TODO: performance
|
|
if any(c in r.text.lower() for c in cringe):
|
|
return
|
|
|
|
if r.status_code != 200:
|
|
print(colored(str(r.status_code), on_color="on_yellow"), domain)
|
|
return
|
|
|
|
print(colored(str(r.status_code), on_color="on_cyan"), domain)
|
|
async with lock:
|
|
if last_open_time + 10 < time():
|
|
if r.text in already_shown:
|
|
return
|
|
already_shown.add(r.text)
|
|
last_open_time = time()
|
|
driver.get(url)
|
|
|
|
|
|
async def main() -> None:
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def callback(message: dict, context: dict) -> None:
|
|
asyncio.run_coroutine_threadsafe(handler(message, context), loop=loop)
|
|
|
|
await asyncio.to_thread(
|
|
certstream.listen_for_events,
|
|
message_callback=callback,
|
|
url="wss://certstream.calidog.io/",
|
|
)
|
|
|
|
|
|
asyncio.run(main())
|